lttng-crash: support recursive traces
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
6c1c0768 20#define _LGPL_SOURCE
3bd1e081 21#include <assert.h>
f02e1e8a 22#include <lttng/ust-ctl.h>
3bd1e081
MD
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
dbb5dfe6 29#include <sys/stat.h>
3bd1e081 30#include <sys/types.h>
77c7c900 31#include <inttypes.h>
3bd1e081 32#include <unistd.h>
ffe60014 33#include <urcu/list.h>
331744e3 34#include <signal.h>
0857097f 35
51a9e1c7 36#include <bin/lttng-consumerd/health-consumerd.h>
990570ed 37#include <common/common.h>
10a8a223 38#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 39#include <common/relayd/relayd.h>
dbb5dfe6 40#include <common/compat/fcntl.h>
f263b7fd 41#include <common/compat/endian.h>
331744e3 42#include <common/consumer-metadata-cache.h>
10a50311 43#include <common/consumer-stream.h>
331744e3 44#include <common/consumer-timer.h>
fe4477ee 45#include <common/utils.h>
309167d2 46#include <common/index/index.h>
10a8a223
DG
47
48#include "ust-consumer.h"
3bd1e081
MD
49
50extern struct lttng_consumer_global_data consumer_data;
51extern int consumer_poll_timeout;
52extern volatile int consumer_quit;
53
54/*
ffe60014
DG
55 * Free channel object and all streams associated with it. This MUST be used
56 * only and only if the channel has _NEVER_ been added to the global channel
57 * hash table.
3bd1e081 58 */
ffe60014 59static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 60{
ffe60014
DG
61 struct lttng_consumer_stream *stream, *stmp;
62
63 assert(channel);
64
65 DBG("UST consumer cleaning stream list");
66
67 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
68 send_node) {
9ce5646a
MD
69
70 health_code_update();
71
ffe60014
DG
72 cds_list_del(&stream->send_node);
73 ustctl_destroy_stream(stream->ustream);
74 free(stream);
75 }
76
77 /*
78 * If a channel is available meaning that was created before the streams
79 * were, delete it.
80 */
81 if (channel->uchan) {
82 lttng_ustconsumer_del_channel(channel);
83 }
3d071855
MD
84 /* Try to rmdir all directories under shm_path root. */
85 if (channel->root_shm_path[0]) {
86 (void) utils_recursive_rmdir(channel->root_shm_path);
87 }
ffe60014
DG
88 free(channel);
89}
3bd1e081
MD
90
91/*
ffe60014 92 * Add channel to internal consumer state.
3bd1e081 93 *
ffe60014 94 * Returns 0 on success or else a negative value.
3bd1e081 95 */
ffe60014
DG
96static int add_channel(struct lttng_consumer_channel *channel,
97 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
98{
99 int ret = 0;
100
ffe60014
DG
101 assert(channel);
102 assert(ctx);
103
104 if (ctx->on_recv_channel != NULL) {
105 ret = ctx->on_recv_channel(channel);
106 if (ret == 0) {
d8ef542d 107 ret = consumer_add_channel(channel, ctx);
ffe60014
DG
108 } else if (ret < 0) {
109 /* Most likely an ENOMEM. */
110 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
111 goto error;
112 }
113 } else {
d8ef542d 114 ret = consumer_add_channel(channel, ctx);
3bd1e081
MD
115 }
116
d88aee68 117 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
118
119error:
3bd1e081
MD
120 return ret;
121}
122
123/*
ffe60014
DG
124 * Allocate and return a consumer channel object.
125 */
126static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
127 const char *pathname, const char *name, uid_t uid, gid_t gid,
da009f2c 128 uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
2bba9e53 129 uint64_t tracefile_size, uint64_t tracefile_count,
ecc48a90 130 uint64_t session_id_per_pid, unsigned int monitor,
d7ba1388 131 unsigned int live_timer_interval,
3d071855 132 const char *root_shm_path, const char *shm_path)
ffe60014
DG
133{
134 assert(pathname);
135 assert(name);
136
1950109e
JD
137 return consumer_allocate_channel(key, session_id, pathname, name, uid,
138 gid, relayd_id, output, tracefile_size,
d7ba1388 139 tracefile_count, session_id_per_pid, monitor,
3d071855 140 live_timer_interval, root_shm_path, shm_path);
ffe60014
DG
141}
142
143/*
144 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
145 * error value if applicable is set in it else it is kept untouched.
3bd1e081 146 *
ffe60014 147 * Return NULL on error else the newly allocated stream object.
3bd1e081 148 */
ffe60014
DG
149static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
150 struct lttng_consumer_channel *channel,
151 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
152{
153 int alloc_ret;
154 struct lttng_consumer_stream *stream = NULL;
155
156 assert(channel);
157 assert(ctx);
158
159 stream = consumer_allocate_stream(channel->key,
160 key,
161 LTTNG_CONSUMER_ACTIVE_STREAM,
162 channel->name,
163 channel->uid,
164 channel->gid,
165 channel->relayd_id,
166 channel->session_id,
167 cpu,
168 &alloc_ret,
4891ece8
DG
169 channel->type,
170 channel->monitor);
ffe60014
DG
171 if (stream == NULL) {
172 switch (alloc_ret) {
173 case -ENOENT:
174 /*
175 * We could not find the channel. Can happen if cpu hotplug
176 * happens while tearing down.
177 */
178 DBG3("Could not find channel");
179 break;
180 case -ENOMEM:
181 case -EINVAL:
182 default:
183 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
184 break;
185 }
186 goto error;
187 }
188
189 stream->chan = channel;
190
191error:
192 if (_alloc_ret) {
193 *_alloc_ret = alloc_ret;
194 }
195 return stream;
196}
197
198/*
199 * Send the given stream pointer to the corresponding thread.
200 *
201 * Returns 0 on success else a negative value.
202 */
203static int send_stream_to_thread(struct lttng_consumer_stream *stream,
204 struct lttng_consumer_local_data *ctx)
205{
dae10966
DG
206 int ret;
207 struct lttng_pipe *stream_pipe;
ffe60014
DG
208
209 /* Get the right pipe where the stream will be sent. */
210 if (stream->metadata_flag) {
5ab66908
MD
211 ret = consumer_add_metadata_stream(stream);
212 if (ret) {
213 ERR("Consumer add metadata stream %" PRIu64 " failed.",
214 stream->key);
215 goto error;
216 }
dae10966 217 stream_pipe = ctx->consumer_metadata_pipe;
ffe60014 218 } else {
5ab66908
MD
219 ret = consumer_add_data_stream(stream);
220 if (ret) {
221 ERR("Consumer add stream %" PRIu64 " failed.",
222 stream->key);
223 goto error;
224 }
dae10966 225 stream_pipe = ctx->consumer_data_pipe;
ffe60014
DG
226 }
227
5ab66908
MD
228 /*
229 * From this point on, the stream's ownership has been moved away from
230 * the channel and becomes globally visible.
231 */
232 stream->globally_visible = 1;
233
dae10966 234 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
ffe60014 235 if (ret < 0) {
dae10966
DG
236 ERR("Consumer write %s stream to pipe %d",
237 stream->metadata_flag ? "metadata" : "data",
238 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
239 if (stream->metadata_flag) {
240 consumer_del_stream_for_metadata(stream);
241 } else {
242 consumer_del_stream_for_data(stream);
243 }
ffe60014 244 }
5ab66908 245error:
ffe60014
DG
246 return ret;
247}
248
d88aee68
DG
249/*
250 * Create streams for the given channel using liblttng-ust-ctl.
251 *
252 * Return 0 on success else a negative value.
253 */
ffe60014
DG
254static int create_ust_streams(struct lttng_consumer_channel *channel,
255 struct lttng_consumer_local_data *ctx)
256{
257 int ret, cpu = 0;
258 struct ustctl_consumer_stream *ustream;
259 struct lttng_consumer_stream *stream;
260
261 assert(channel);
262 assert(ctx);
263
264 /*
265 * While a stream is available from ustctl. When NULL is returned, we've
266 * reached the end of the possible stream for the channel.
267 */
268 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
269 int wait_fd;
04ef1097 270 int ust_metadata_pipe[2];
ffe60014 271
9ce5646a
MD
272 health_code_update();
273
04ef1097
MD
274 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
275 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
276 if (ret < 0) {
277 ERR("Create ust metadata poll pipe");
278 goto error;
279 }
280 wait_fd = ust_metadata_pipe[0];
281 } else {
282 wait_fd = ustctl_stream_get_wait_fd(ustream);
283 }
ffe60014
DG
284
285 /* Allocate consumer stream object. */
286 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
287 if (!stream) {
288 goto error_alloc;
289 }
290 stream->ustream = ustream;
291 /*
292 * Store it so we can save multiple function calls afterwards since
293 * this value is used heavily in the stream threads. This is UST
294 * specific so this is why it's done after allocation.
295 */
296 stream->wait_fd = wait_fd;
297
b31398bb
DG
298 /*
299 * Increment channel refcount since the channel reference has now been
300 * assigned in the allocation process above.
301 */
10a50311
JD
302 if (stream->chan->monitor) {
303 uatomic_inc(&stream->chan->refcount);
304 }
b31398bb 305
ffe60014
DG
306 /*
307 * Order is important this is why a list is used. On error, the caller
308 * should clean this list.
309 */
310 cds_list_add_tail(&stream->send_node, &channel->streams.head);
311
312 ret = ustctl_get_max_subbuf_size(stream->ustream,
313 &stream->max_sb_size);
314 if (ret < 0) {
315 ERR("ustctl_get_max_subbuf_size failed for stream %s",
316 stream->name);
317 goto error;
318 }
319
320 /* Do actions once stream has been received. */
321 if (ctx->on_recv_stream) {
322 ret = ctx->on_recv_stream(stream);
323 if (ret < 0) {
324 goto error;
325 }
326 }
327
d88aee68 328 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
ffe60014
DG
329 stream->name, stream->key, stream->relayd_stream_id);
330
331 /* Set next CPU stream. */
332 channel->streams.count = ++cpu;
d88aee68
DG
333
334 /* Keep stream reference when creating metadata. */
335 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
336 channel->metadata_stream = stream;
04ef1097
MD
337 stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0];
338 stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1];
d88aee68 339 }
ffe60014
DG
340 }
341
342 return 0;
343
344error:
345error_alloc:
346 return ret;
347}
348
349/*
350 * Create an UST channel with the given attributes and send it to the session
351 * daemon using the ust ctl API.
352 *
353 * Return 0 on success or else a negative value.
354 */
355static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
356 struct ustctl_consumer_channel **chanp)
357{
358 int ret;
359 struct ustctl_consumer_channel *channel;
360
361 assert(attr);
362 assert(chanp);
363
364 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
365 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
366 "switch_timer_interval: %u, read_timer_interval: %u, "
367 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
368 attr->num_subbuf, attr->switch_timer_interval,
369 attr->read_timer_interval, attr->output, attr->type);
370
371 channel = ustctl_create_channel(attr);
372 if (!channel) {
373 ret = -1;
374 goto error_create;
375 }
376
377 *chanp = channel;
378
379 return 0;
380
381error_create:
382 return ret;
383}
384
d88aee68
DG
385/*
386 * Send a single given stream to the session daemon using the sock.
387 *
388 * Return 0 on success else a negative value.
389 */
ffe60014
DG
390static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
391{
392 int ret;
393
394 assert(stream);
395 assert(sock >= 0);
396
3eb914c0 397 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
398
399 /* Send stream to session daemon. */
400 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
401 if (ret < 0) {
402 goto error;
403 }
404
ffe60014
DG
405error:
406 return ret;
407}
408
409/*
410 * Send channel to sessiond.
411 *
d88aee68 412 * Return 0 on success or else a negative value.
ffe60014
DG
413 */
414static int send_sessiond_channel(int sock,
415 struct lttng_consumer_channel *channel,
416 struct lttng_consumer_local_data *ctx, int *relayd_error)
417{
0c759fc9 418 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014 419 struct lttng_consumer_stream *stream;
a4baae1b 420 uint64_t net_seq_idx = -1ULL;
ffe60014
DG
421
422 assert(channel);
423 assert(ctx);
424 assert(sock >= 0);
425
426 DBG("UST consumer sending channel %s to sessiond", channel->name);
427
62285ea4
DG
428 if (channel->relayd_id != (uint64_t) -1ULL) {
429 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
9ce5646a
MD
430
431 health_code_update();
432
62285ea4
DG
433 /* Try to send the stream to the relayd if one is available. */
434 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
435 if (ret < 0) {
436 /*
437 * Flag that the relayd was the problem here probably due to a
438 * communicaton error on the socket.
439 */
440 if (relayd_error) {
441 *relayd_error = 1;
442 }
725d28b2 443 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
ffe60014 444 }
a4baae1b
JD
445 if (net_seq_idx == -1ULL) {
446 net_seq_idx = stream->net_seq_idx;
447 }
448 }
f2a444f1 449 }
ffe60014 450
f2a444f1
DG
451 /* Inform sessiond that we are about to send channel and streams. */
452 ret = consumer_send_status_msg(sock, ret_code);
0c759fc9 453 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
454 /*
455 * Either the session daemon is not responding or the relayd died so we
456 * stop now.
457 */
458 goto error;
459 }
460
461 /* Send channel to sessiond. */
462 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
463 if (ret < 0) {
464 goto error;
465 }
466
467 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
468 if (ret < 0) {
469 goto error;
470 }
471
472 /* The channel was sent successfully to the sessiond at this point. */
473 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
9ce5646a
MD
474
475 health_code_update();
476
ffe60014
DG
477 /* Send stream to session daemon. */
478 ret = send_sessiond_stream(sock, stream);
479 if (ret < 0) {
480 goto error;
481 }
482 }
483
484 /* Tell sessiond there is no more stream. */
485 ret = ustctl_send_stream_to_sessiond(sock, NULL);
486 if (ret < 0) {
487 goto error;
488 }
489
490 DBG("UST consumer NULL stream sent to sessiond");
491
492 return 0;
493
494error:
0c759fc9 495 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
496 ret = -1;
497 }
ffe60014
DG
498 return ret;
499}
500
501/*
502 * Creates a channel and streams and add the channel it to the channel internal
503 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
504 * received.
505 *
506 * Return 0 on success or else, a negative value is returned and the channel
507 * MUST be destroyed by consumer_del_channel().
508 */
509static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
510 struct lttng_consumer_channel *channel,
511 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
512{
513 int ret;
514
ffe60014
DG
515 assert(ctx);
516 assert(channel);
517 assert(attr);
518
519 /*
520 * This value is still used by the kernel consumer since for the kernel,
521 * the stream ownership is not IN the consumer so we need to have the
522 * number of left stream that needs to be initialized so we can know when
523 * to delete the channel (see consumer.c).
524 *
525 * As for the user space tracer now, the consumer creates and sends the
526 * stream to the session daemon which only sends them to the application
527 * once every stream of a channel is received making this value useless
528 * because we they will be added to the poll thread before the application
529 * receives them. This ensures that a stream can not hang up during
530 * initilization of a channel.
531 */
532 channel->nb_init_stream_left = 0;
533
534 /* The reply msg status is handled in the following call. */
535 ret = create_ust_channel(attr, &channel->uchan);
536 if (ret < 0) {
10a50311 537 goto end;
3bd1e081
MD
538 }
539
d8ef542d
MD
540 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
541
10a50311
JD
542 /*
543 * For the snapshots (no monitor), we create the metadata streams
544 * on demand, not during the channel creation.
545 */
546 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
547 ret = 0;
548 goto end;
549 }
550
ffe60014
DG
551 /* Open all streams for this channel. */
552 ret = create_ust_streams(channel, ctx);
553 if (ret < 0) {
10a50311 554 goto end;
ffe60014
DG
555 }
556
10a50311 557end:
3bd1e081
MD
558 return ret;
559}
560
d88aee68
DG
561/*
562 * Send all stream of a channel to the right thread handling it.
563 *
564 * On error, return a negative value else 0 on success.
565 */
566static int send_streams_to_thread(struct lttng_consumer_channel *channel,
567 struct lttng_consumer_local_data *ctx)
568{
569 int ret = 0;
570 struct lttng_consumer_stream *stream, *stmp;
571
572 assert(channel);
573 assert(ctx);
574
575 /* Send streams to the corresponding thread. */
576 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
577 send_node) {
9ce5646a
MD
578
579 health_code_update();
580
d88aee68
DG
581 /* Sending the stream to the thread. */
582 ret = send_stream_to_thread(stream, ctx);
583 if (ret < 0) {
584 /*
585 * If we are unable to send the stream to the thread, there is
586 * a big problem so just stop everything.
587 */
5ab66908
MD
588 /* Remove node from the channel stream list. */
589 cds_list_del(&stream->send_node);
d88aee68
DG
590 goto error;
591 }
592
593 /* Remove node from the channel stream list. */
594 cds_list_del(&stream->send_node);
4891ece8 595
d88aee68
DG
596 }
597
598error:
599 return ret;
600}
601
7972aab2
DG
602/*
603 * Flush channel's streams using the given key to retrieve the channel.
604 *
605 * Return 0 on success else an LTTng error code.
606 */
607static int flush_channel(uint64_t chan_key)
608{
609 int ret = 0;
610 struct lttng_consumer_channel *channel;
611 struct lttng_consumer_stream *stream;
612 struct lttng_ht *ht;
613 struct lttng_ht_iter iter;
614
8fd623e0 615 DBG("UST consumer flush channel key %" PRIu64, chan_key);
7972aab2 616
a500c257 617 rcu_read_lock();
7972aab2
DG
618 channel = consumer_find_channel(chan_key);
619 if (!channel) {
8fd623e0 620 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
7972aab2
DG
621 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
622 goto error;
623 }
624
625 ht = consumer_data.stream_per_chan_id_ht;
626
627 /* For each stream of the channel id, flush it. */
7972aab2
DG
628 cds_lfht_for_each_entry_duplicate(ht->ht,
629 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
630 &channel->key, &iter.iter, stream, node_channel_id.node) {
9ce5646a
MD
631
632 health_code_update();
633
b8086166 634 ustctl_flush_buffer(stream->ustream, 1);
7972aab2 635 }
7972aab2 636error:
a500c257 637 rcu_read_unlock();
7972aab2
DG
638 return ret;
639}
640
d88aee68
DG
641/*
642 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
a500c257 643 * RCU read side lock MUST be acquired before calling this function.
d88aee68
DG
644 *
645 * Return 0 on success else an LTTng error code.
646 */
647static int close_metadata(uint64_t chan_key)
648{
ea88ca2a 649 int ret = 0;
d88aee68
DG
650 struct lttng_consumer_channel *channel;
651
8fd623e0 652 DBG("UST consumer close metadata key %" PRIu64, chan_key);
d88aee68
DG
653
654 channel = consumer_find_channel(chan_key);
655 if (!channel) {
84cc9aa0
DG
656 /*
657 * This is possible if the metadata thread has issue a delete because
658 * the endpoint point of the stream hung up. There is no way the
659 * session daemon can know about it thus use a DBG instead of an actual
660 * error.
661 */
662 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
d88aee68
DG
663 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
664 goto error;
665 }
666
ea88ca2a 667 pthread_mutex_lock(&consumer_data.lock);
a9838785 668 pthread_mutex_lock(&channel->lock);
73811ecc
DG
669
670 if (cds_lfht_is_node_deleted(&channel->node.node)) {
671 goto error_unlock;
672 }
673
6d574024 674 lttng_ustconsumer_close_metadata(channel);
d88aee68 675
ea88ca2a 676error_unlock:
a9838785 677 pthread_mutex_unlock(&channel->lock);
ea88ca2a 678 pthread_mutex_unlock(&consumer_data.lock);
d88aee68
DG
679error:
680 return ret;
681}
682
683/*
684 * RCU read side lock MUST be acquired before calling this function.
685 *
686 * Return 0 on success else an LTTng error code.
687 */
688static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
689{
690 int ret;
691 struct lttng_consumer_channel *metadata;
692
8fd623e0 693 DBG("UST consumer setup metadata key %" PRIu64, key);
d88aee68
DG
694
695 metadata = consumer_find_channel(key);
696 if (!metadata) {
697 ERR("UST consumer push metadata %" PRIu64 " not found", key);
698 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
10a50311
JD
699 goto end;
700 }
701
702 /*
703 * In no monitor mode, the metadata channel has no stream(s) so skip the
704 * ownership transfer to the metadata thread.
705 */
706 if (!metadata->monitor) {
707 DBG("Metadata channel in no monitor");
708 ret = 0;
709 goto end;
d88aee68
DG
710 }
711
712 /*
713 * Send metadata stream to relayd if one available. Availability is
714 * known if the stream is still in the list of the channel.
715 */
716 if (cds_list_empty(&metadata->streams.head)) {
717 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
718 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
f5a0c9cf 719 goto error_no_stream;
d88aee68
DG
720 }
721
722 /* Send metadata stream to relayd if needed. */
62285ea4
DG
723 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
724 ret = consumer_send_relayd_stream(metadata->metadata_stream,
725 metadata->pathname);
726 if (ret < 0) {
727 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
728 goto error;
729 }
601262d6
JD
730 ret = consumer_send_relayd_streams_sent(
731 metadata->metadata_stream->net_seq_idx);
732 if (ret < 0) {
733 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
734 goto error;
735 }
d88aee68
DG
736 }
737
738 ret = send_streams_to_thread(metadata, ctx);
739 if (ret < 0) {
740 /*
741 * If we are unable to send the stream to the thread, there is
742 * a big problem so just stop everything.
743 */
744 ret = LTTCOMM_CONSUMERD_FATAL;
745 goto error;
746 }
747 /* List MUST be empty after or else it could be reused. */
748 assert(cds_list_empty(&metadata->streams.head));
749
10a50311
JD
750 ret = 0;
751 goto end;
d88aee68
DG
752
753error:
f2a444f1
DG
754 /*
755 * Delete metadata channel on error. At this point, the metadata stream can
756 * NOT be monitored by the metadata thread thus having the guarantee that
757 * the stream is still in the local stream list of the channel. This call
758 * will make sure to clean that list.
759 */
f5a0c9cf 760 consumer_stream_destroy(metadata->metadata_stream, NULL);
212d67a2
DG
761 cds_list_del(&metadata->metadata_stream->send_node);
762 metadata->metadata_stream = NULL;
f5a0c9cf 763error_no_stream:
10a50311
JD
764end:
765 return ret;
766}
767
768/*
769 * Snapshot the whole metadata.
770 *
771 * Returns 0 on success, < 0 on error
772 */
773static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
774 struct lttng_consumer_local_data *ctx)
775{
776 int ret = 0;
10a50311
JD
777 struct lttng_consumer_channel *metadata_channel;
778 struct lttng_consumer_stream *metadata_stream;
779
780 assert(path);
781 assert(ctx);
782
783 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
784 key, path);
785
786 rcu_read_lock();
787
788 metadata_channel = consumer_find_channel(key);
789 if (!metadata_channel) {
6a00837f
MD
790 ERR("UST snapshot metadata channel not found for key %" PRIu64,
791 key);
10a50311
JD
792 ret = -1;
793 goto error;
794 }
795 assert(!metadata_channel->monitor);
796
9ce5646a
MD
797 health_code_update();
798
10a50311
JD
799 /*
800 * Ask the sessiond if we have new metadata waiting and update the
801 * consumer metadata cache.
802 */
94d49140 803 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
10a50311
JD
804 if (ret < 0) {
805 goto error;
806 }
807
9ce5646a
MD
808 health_code_update();
809
10a50311
JD
810 /*
811 * The metadata stream is NOT created in no monitor mode when the channel
812 * is created on a sessiond ask channel command.
813 */
814 ret = create_ust_streams(metadata_channel, ctx);
815 if (ret < 0) {
816 goto error;
817 }
818
819 metadata_stream = metadata_channel->metadata_stream;
820 assert(metadata_stream);
821
822 if (relayd_id != (uint64_t) -1ULL) {
823 metadata_stream->net_seq_idx = relayd_id;
824 ret = consumer_send_relayd_stream(metadata_stream, path);
825 if (ret < 0) {
826 goto error_stream;
827 }
828 } else {
829 ret = utils_create_stream_file(path, metadata_stream->name,
830 metadata_stream->chan->tracefile_size,
831 metadata_stream->tracefile_count_current,
309167d2 832 metadata_stream->uid, metadata_stream->gid, NULL);
10a50311
JD
833 if (ret < 0) {
834 goto error_stream;
835 }
836 metadata_stream->out_fd = ret;
837 metadata_stream->tracefile_size_current = 0;
838 }
839
04ef1097 840 do {
9ce5646a
MD
841 health_code_update();
842
10a50311
JD
843 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
844 if (ret < 0) {
94d49140 845 goto error_stream;
10a50311 846 }
04ef1097 847 } while (ret > 0);
10a50311 848
10a50311
JD
849error_stream:
850 /*
851 * Clean up the stream completly because the next snapshot will use a new
852 * metadata stream.
853 */
10a50311 854 consumer_stream_destroy(metadata_stream, NULL);
212d67a2 855 cds_list_del(&metadata_stream->send_node);
10a50311
JD
856 metadata_channel->metadata_stream = NULL;
857
858error:
859 rcu_read_unlock();
860 return ret;
861}
862
863/*
864 * Take a snapshot of all the stream of a channel.
865 *
866 * Returns 0 on success, < 0 on error
867 */
868static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
d07ceecd 869 uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
10a50311
JD
870{
871 int ret;
872 unsigned use_relayd = 0;
873 unsigned long consumed_pos, produced_pos;
874 struct lttng_consumer_channel *channel;
875 struct lttng_consumer_stream *stream;
876
877 assert(path);
878 assert(ctx);
879
880 rcu_read_lock();
881
882 if (relayd_id != (uint64_t) -1ULL) {
883 use_relayd = 1;
884 }
885
886 channel = consumer_find_channel(key);
887 if (!channel) {
6a00837f 888 ERR("UST snapshot channel not found for key %" PRIu64, key);
10a50311
JD
889 ret = -1;
890 goto error;
891 }
892 assert(!channel->monitor);
6a00837f 893 DBG("UST consumer snapshot channel %" PRIu64, key);
10a50311
JD
894
895 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
9ce5646a
MD
896
897 health_code_update();
898
10a50311
JD
899 /* Lock stream because we are about to change its state. */
900 pthread_mutex_lock(&stream->lock);
901 stream->net_seq_idx = relayd_id;
902
903 if (use_relayd) {
904 ret = consumer_send_relayd_stream(stream, path);
905 if (ret < 0) {
906 goto error_unlock;
907 }
908 } else {
909 ret = utils_create_stream_file(path, stream->name,
910 stream->chan->tracefile_size,
911 stream->tracefile_count_current,
309167d2 912 stream->uid, stream->gid, NULL);
10a50311
JD
913 if (ret < 0) {
914 goto error_unlock;
915 }
916 stream->out_fd = ret;
917 stream->tracefile_size_current = 0;
918
919 DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
920 stream->name, stream->key);
921 }
a4baae1b
JD
922 if (relayd_id != -1ULL) {
923 ret = consumer_send_relayd_streams_sent(relayd_id);
924 if (ret < 0) {
925 goto error_unlock;
926 }
927 }
10a50311
JD
928
929 ustctl_flush_buffer(stream->ustream, 1);
930
931 ret = lttng_ustconsumer_take_snapshot(stream);
932 if (ret < 0) {
933 ERR("Taking UST snapshot");
934 goto error_unlock;
935 }
936
937 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
938 if (ret < 0) {
939 ERR("Produced UST snapshot position");
940 goto error_unlock;
941 }
942
943 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
944 if (ret < 0) {
945 ERR("Consumerd UST snapshot position");
946 goto error_unlock;
947 }
948
5c786ded
JD
949 /*
950 * The original value is sent back if max stream size is larger than
d07ceecd 951 * the possible size of the snapshot. Also, we assume that the session
5c786ded
JD
952 * daemon should never send a maximum stream size that is lower than
953 * subbuffer size.
954 */
d07ceecd
MD
955 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
956 produced_pos, nb_packets_per_stream,
957 stream->max_sb_size);
5c786ded 958
10a50311
JD
959 while (consumed_pos < produced_pos) {
960 ssize_t read_len;
961 unsigned long len, padded_len;
962
9ce5646a
MD
963 health_code_update();
964
10a50311
JD
965 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
966
967 ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
968 if (ret < 0) {
969 if (ret != -EAGAIN) {
970 PERROR("ustctl_get_subbuf snapshot");
971 goto error_close_stream;
972 }
973 DBG("UST consumer get subbuf failed. Skipping it.");
974 consumed_pos += stream->max_sb_size;
975 continue;
976 }
977
978 ret = ustctl_get_subbuf_size(stream->ustream, &len);
979 if (ret < 0) {
980 ERR("Snapshot ustctl_get_subbuf_size");
981 goto error_put_subbuf;
982 }
983
984 ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
985 if (ret < 0) {
986 ERR("Snapshot ustctl_get_padded_subbuf_size");
987 goto error_put_subbuf;
988 }
989
990 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
309167d2 991 padded_len - len, NULL);
10a50311
JD
992 if (use_relayd) {
993 if (read_len != len) {
56591bac 994 ret = -EPERM;
10a50311
JD
995 goto error_put_subbuf;
996 }
997 } else {
998 if (read_len != padded_len) {
56591bac 999 ret = -EPERM;
10a50311
JD
1000 goto error_put_subbuf;
1001 }
1002 }
1003
1004 ret = ustctl_put_subbuf(stream->ustream);
1005 if (ret < 0) {
1006 ERR("Snapshot ustctl_put_subbuf");
1007 goto error_close_stream;
1008 }
1009 consumed_pos += stream->max_sb_size;
1010 }
1011
1012 /* Simply close the stream so we can use it on the next snapshot. */
1013 consumer_stream_close(stream);
1014 pthread_mutex_unlock(&stream->lock);
1015 }
1016
1017 rcu_read_unlock();
1018 return 0;
1019
1020error_put_subbuf:
1021 if (ustctl_put_subbuf(stream->ustream) < 0) {
1022 ERR("Snapshot ustctl_put_subbuf");
1023 }
1024error_close_stream:
1025 consumer_stream_close(stream);
1026error_unlock:
1027 pthread_mutex_unlock(&stream->lock);
1028error:
1029 rcu_read_unlock();
d88aee68
DG
1030 return ret;
1031}
1032
331744e3
JD
1033/*
1034 * Receive the metadata updates from the sessiond.
1035 */
1036int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
5e41ebe1 1037 uint64_t len, struct lttng_consumer_channel *channel,
94d49140 1038 int timer, int wait)
331744e3 1039{
0c759fc9 1040 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
331744e3
JD
1041 char *metadata_str;
1042
8fd623e0 1043 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
331744e3
JD
1044
1045 metadata_str = zmalloc(len * sizeof(char));
1046 if (!metadata_str) {
1047 PERROR("zmalloc metadata string");
1048 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1049 goto end;
1050 }
1051
9ce5646a
MD
1052 health_code_update();
1053
331744e3
JD
1054 /* Receive metadata string. */
1055 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1056 if (ret < 0) {
1057 /* Session daemon is dead so return gracefully. */
1058 ret_code = ret;
1059 goto end_free;
1060 }
1061
9ce5646a
MD
1062 health_code_update();
1063
331744e3
JD
1064 pthread_mutex_lock(&channel->metadata_cache->lock);
1065 ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
1066 if (ret < 0) {
1067 /* Unable to handle metadata. Notify session daemon. */
1068 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
a32bd775
DG
1069 /*
1070 * Skip metadata flush on write error since the offset and len might
1071 * not have been updated which could create an infinite loop below when
1072 * waiting for the metadata cache to be flushed.
1073 */
1074 pthread_mutex_unlock(&channel->metadata_cache->lock);
a32bd775 1075 goto end_free;
331744e3
JD
1076 }
1077 pthread_mutex_unlock(&channel->metadata_cache->lock);
1078
94d49140
JD
1079 if (!wait) {
1080 goto end_free;
1081 }
5e41ebe1 1082 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
331744e3 1083 DBG("Waiting for metadata to be flushed");
9ce5646a
MD
1084
1085 health_code_update();
1086
331744e3
JD
1087 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1088 }
1089
1090end_free:
1091 free(metadata_str);
1092end:
1093 return ret_code;
1094}
1095
4cbc1a04
DG
1096/*
1097 * Receive command from session daemon and process it.
1098 *
1099 * Return 1 on success else a negative value or 0.
1100 */
3bd1e081
MD
1101int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1102 int sock, struct pollfd *consumer_sockpoll)
1103{
1104 ssize_t ret;
0c759fc9 1105 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081 1106 struct lttcomm_consumer_msg msg;
ffe60014 1107 struct lttng_consumer_channel *channel = NULL;
3bd1e081 1108
9ce5646a
MD
1109 health_code_update();
1110
3bd1e081
MD
1111 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1112 if (ret != sizeof(msg)) {
173af62f
DG
1113 DBG("Consumer received unexpected message size %zd (expects %zu)",
1114 ret, sizeof(msg));
3be74084
DG
1115 /*
1116 * The ret value might 0 meaning an orderly shutdown but this is ok
1117 * since the caller handles this.
1118 */
489f70e9 1119 if (ret > 0) {
c6857fcf 1120 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
489f70e9
MD
1121 ret = -1;
1122 }
3bd1e081
MD
1123 return ret;
1124 }
9ce5646a
MD
1125
1126 health_code_update();
1127
84382d49
MD
1128 /* deprecated */
1129 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 1130
9ce5646a
MD
1131 health_code_update();
1132
3f8e211f 1133 /* relayd needs RCU read-side lock */
b0b335c8
MD
1134 rcu_read_lock();
1135
3bd1e081 1136 switch (msg.cmd_type) {
00e2e675
DG
1137 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1138 {
f50f23d9 1139 /* Session daemon status message are handled in the following call. */
7735ef9e
DG
1140 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
1141 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
d3e2ba59
JD
1142 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
1143 msg.u.relayd_sock.relayd_session_id);
00e2e675
DG
1144 goto end_nosignal;
1145 }
173af62f
DG
1146 case LTTNG_CONSUMER_DESTROY_RELAYD:
1147 {
a6ba4fe1 1148 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
1149 struct consumer_relayd_sock_pair *relayd;
1150
a6ba4fe1 1151 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
1152
1153 /* Get relayd reference if exists. */
a6ba4fe1 1154 relayd = consumer_find_relayd(index);
173af62f 1155 if (relayd == NULL) {
3448e266 1156 DBG("Unable to find relayd %" PRIu64, index);
e462382a 1157 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
173af62f
DG
1158 }
1159
a6ba4fe1
DG
1160 /*
1161 * Each relayd socket pair has a refcount of stream attached to it
1162 * which tells if the relayd is still active or not depending on the
1163 * refcount value.
1164 *
1165 * This will set the destroy flag of the relayd object and destroy it
1166 * if the refcount reaches zero when called.
1167 *
1168 * The destroy can happen either here or when a stream fd hangs up.
1169 */
f50f23d9
DG
1170 if (relayd) {
1171 consumer_flag_relayd_for_destroy(relayd);
1172 }
1173
d88aee68 1174 goto end_msg_sessiond;
173af62f 1175 }
3bd1e081
MD
1176 case LTTNG_CONSUMER_UPDATE_STREAM:
1177 {
3f8e211f 1178 rcu_read_unlock();
7ad0a0cb 1179 return -ENOSYS;
3bd1e081 1180 }
6d805429 1181 case LTTNG_CONSUMER_DATA_PENDING:
53632229 1182 {
3be74084 1183 int ret, is_data_pending;
6d805429 1184 uint64_t id = msg.u.data_pending.session_id;
ca22feea 1185
6d805429 1186 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 1187
3be74084 1188 is_data_pending = consumer_data_pending(id);
ca22feea
DG
1189
1190 /* Send back returned value to session daemon */
3be74084
DG
1191 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
1192 sizeof(is_data_pending));
ca22feea 1193 if (ret < 0) {
3be74084 1194 DBG("Error when sending the data pending ret code: %d", ret);
489f70e9 1195 goto error_fatal;
ca22feea 1196 }
f50f23d9
DG
1197
1198 /*
1199 * No need to send back a status message since the data pending
1200 * returned value is the response.
1201 */
ca22feea 1202 break;
53632229 1203 }
ffe60014
DG
1204 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1205 {
1206 int ret;
1207 struct ustctl_consumer_channel_attr attr;
1208
1209 /* Create a plain object and reserve a channel key. */
1210 channel = allocate_channel(msg.u.ask_channel.session_id,
1211 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
1212 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
1213 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
1624d5b7
JD
1214 (enum lttng_event_output) msg.u.ask_channel.output,
1215 msg.u.ask_channel.tracefile_size,
2bba9e53 1216 msg.u.ask_channel.tracefile_count,
1950109e 1217 msg.u.ask_channel.session_id_per_pid,
ecc48a90 1218 msg.u.ask_channel.monitor,
d7ba1388 1219 msg.u.ask_channel.live_timer_interval,
3d071855 1220 msg.u.ask_channel.root_shm_path,
d7ba1388 1221 msg.u.ask_channel.shm_path);
ffe60014
DG
1222 if (!channel) {
1223 goto end_channel_error;
1224 }
1225
567eb353
DG
1226 /*
1227 * Assign UST application UID to the channel. This value is ignored for
1228 * per PID buffers. This is specific to UST thus setting this after the
1229 * allocation.
1230 */
1231 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1232
ffe60014
DG
1233 /* Build channel attributes from received message. */
1234 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1235 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1236 attr.overwrite = msg.u.ask_channel.overwrite;
1237 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1238 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
7972aab2 1239 attr.chan_id = msg.u.ask_channel.chan_id;
ffe60014 1240 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
d7ba1388
MD
1241 strncpy(attr.shm_path, channel->shm_path,
1242 sizeof(attr.shm_path));
1243 attr.shm_path[sizeof(attr.shm_path) - 1] = '\0';
ffe60014 1244
0c759fc9
DG
1245 /* Match channel buffer type to the UST abi. */
1246 switch (msg.u.ask_channel.output) {
1247 case LTTNG_EVENT_MMAP:
1248 default:
1249 attr.output = LTTNG_UST_MMAP;
1250 break;
1251 }
1252
ffe60014
DG
1253 /* Translate and save channel type. */
1254 switch (msg.u.ask_channel.type) {
1255 case LTTNG_UST_CHAN_PER_CPU:
1256 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1257 attr.type = LTTNG_UST_CHAN_PER_CPU;
8633d6e3
MD
1258 /*
1259 * Set refcount to 1 for owner. Below, we will
1260 * pass ownership to the
1261 * consumer_thread_channel_poll() thread.
1262 */
1263 channel->refcount = 1;
ffe60014
DG
1264 break;
1265 case LTTNG_UST_CHAN_METADATA:
1266 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1267 attr.type = LTTNG_UST_CHAN_METADATA;
1268 break;
1269 default:
1270 assert(0);
1271 goto error_fatal;
1272 };
1273
9ce5646a
MD
1274 health_code_update();
1275
ffe60014
DG
1276 ret = ask_channel(ctx, sock, channel, &attr);
1277 if (ret < 0) {
1278 goto end_channel_error;
1279 }
1280
fc643247
MD
1281 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1282 ret = consumer_metadata_cache_allocate(channel);
1283 if (ret < 0) {
1284 ERR("Allocating metadata cache");
1285 goto end_channel_error;
1286 }
1287 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1288 attr.switch_timer_interval = 0;
94d49140
JD
1289 } else {
1290 consumer_timer_live_start(channel,
1291 msg.u.ask_channel.live_timer_interval);
fc643247
MD
1292 }
1293
9ce5646a
MD
1294 health_code_update();
1295
ffe60014
DG
1296 /*
1297 * Add the channel to the internal state AFTER all streams were created
1298 * and successfully sent to session daemon. This way, all streams must
1299 * be ready before this channel is visible to the threads.
fc643247
MD
1300 * If add_channel succeeds, ownership of the channel is
1301 * passed to consumer_thread_channel_poll().
ffe60014
DG
1302 */
1303 ret = add_channel(channel, ctx);
1304 if (ret < 0) {
ea88ca2a
MD
1305 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1306 if (channel->switch_timer_enabled == 1) {
1307 consumer_timer_switch_stop(channel);
1308 }
1309 consumer_metadata_cache_destroy(channel);
1310 }
d3e2ba59
JD
1311 if (channel->live_timer_enabled == 1) {
1312 consumer_timer_live_stop(channel);
1313 }
ffe60014
DG
1314 goto end_channel_error;
1315 }
1316
9ce5646a
MD
1317 health_code_update();
1318
ffe60014
DG
1319 /*
1320 * Channel and streams are now created. Inform the session daemon that
1321 * everything went well and should wait to receive the channel and
1322 * streams with ustctl API.
1323 */
1324 ret = consumer_send_status_channel(sock, channel);
1325 if (ret < 0) {
1326 /*
489f70e9 1327 * There is probably a problem on the socket.
ffe60014 1328 */
489f70e9 1329 goto error_fatal;
ffe60014
DG
1330 }
1331
1332 break;
1333 }
1334 case LTTNG_CONSUMER_GET_CHANNEL:
1335 {
1336 int ret, relayd_err = 0;
d88aee68 1337 uint64_t key = msg.u.get_channel.key;
ffe60014 1338 struct lttng_consumer_channel *channel;
ffe60014
DG
1339
1340 channel = consumer_find_channel(key);
1341 if (!channel) {
8fd623e0 1342 ERR("UST consumer get channel key %" PRIu64 " not found", key);
e462382a 1343 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
1344 goto end_msg_sessiond;
1345 }
1346
9ce5646a
MD
1347 health_code_update();
1348
ffe60014
DG
1349 /* Send everything to sessiond. */
1350 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
1351 if (ret < 0) {
1352 if (relayd_err) {
1353 /*
1354 * We were unable to send to the relayd the stream so avoid
1355 * sending back a fatal error to the thread since this is OK
f2a444f1
DG
1356 * and the consumer can continue its work. The above call
1357 * has sent the error status message to the sessiond.
ffe60014 1358 */
f2a444f1 1359 goto end_nosignal;
ffe60014
DG
1360 }
1361 /*
1362 * The communicaton was broken hence there is a bad state between
1363 * the consumer and sessiond so stop everything.
1364 */
1365 goto error_fatal;
1366 }
1367
9ce5646a
MD
1368 health_code_update();
1369
10a50311
JD
1370 /*
1371 * In no monitor mode, the streams ownership is kept inside the channel
1372 * so don't send them to the data thread.
1373 */
1374 if (!channel->monitor) {
1375 goto end_msg_sessiond;
1376 }
1377
d88aee68
DG
1378 ret = send_streams_to_thread(channel, ctx);
1379 if (ret < 0) {
1380 /*
1381 * If we are unable to send the stream to the thread, there is
1382 * a big problem so just stop everything.
1383 */
1384 goto error_fatal;
ffe60014 1385 }
ffe60014
DG
1386 /* List MUST be empty after or else it could be reused. */
1387 assert(cds_list_empty(&channel->streams.head));
d88aee68
DG
1388 goto end_msg_sessiond;
1389 }
1390 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1391 {
1392 uint64_t key = msg.u.destroy_channel.key;
d88aee68 1393
a0cbdd2e
MD
1394 /*
1395 * Only called if streams have not been sent to stream
1396 * manager thread. However, channel has been sent to
1397 * channel manager thread.
1398 */
1399 notify_thread_del_channel(ctx, key);
d88aee68 1400 goto end_msg_sessiond;
ffe60014 1401 }
d88aee68
DG
1402 case LTTNG_CONSUMER_CLOSE_METADATA:
1403 {
1404 int ret;
1405
1406 ret = close_metadata(msg.u.close_metadata.key);
1407 if (ret != 0) {
1408 ret_code = ret;
1409 }
1410
1411 goto end_msg_sessiond;
1412 }
7972aab2
DG
1413 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1414 {
1415 int ret;
1416
1417 ret = flush_channel(msg.u.flush_channel.key);
1418 if (ret != 0) {
1419 ret_code = ret;
1420 }
1421
1422 goto end_msg_sessiond;
1423 }
d88aee68 1424 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
1425 {
1426 int ret;
d88aee68 1427 uint64_t len = msg.u.push_metadata.len;
d88aee68 1428 uint64_t key = msg.u.push_metadata.key;
331744e3 1429 uint64_t offset = msg.u.push_metadata.target_offset;
ffe60014
DG
1430 struct lttng_consumer_channel *channel;
1431
8fd623e0
DG
1432 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
1433 len);
ffe60014
DG
1434
1435 channel = consumer_find_channel(key);
1436 if (!channel) {
000baf6a
DG
1437 /*
1438 * This is possible if the metadata creation on the consumer side
1439 * is in flight vis-a-vis a concurrent push metadata from the
1440 * session daemon. Simply return that the channel failed and the
1441 * session daemon will handle that message correctly considering
1442 * that this race is acceptable thus the DBG() statement here.
1443 */
1444 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1445 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
4a2eb0ca 1446 goto end_msg_sessiond;
d88aee68
DG
1447 }
1448
9ce5646a
MD
1449 health_code_update();
1450
d88aee68 1451 /* Tell session daemon we are ready to receive the metadata. */
0c759fc9 1452 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
ffe60014
DG
1453 if (ret < 0) {
1454 /* Somehow, the session daemon is not responding anymore. */
d88aee68
DG
1455 goto error_fatal;
1456 }
1457
9ce5646a
MD
1458 health_code_update();
1459
d88aee68 1460 /* Wait for more data. */
9ce5646a
MD
1461 health_poll_entry();
1462 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1463 health_poll_exit();
84382d49 1464 if (ret) {
489f70e9 1465 goto error_fatal;
d88aee68
DG
1466 }
1467
9ce5646a
MD
1468 health_code_update();
1469
331744e3 1470 ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
94d49140 1471 len, channel, 0, 1);
d88aee68 1472 if (ret < 0) {
331744e3 1473 /* error receiving from sessiond */
489f70e9 1474 goto error_fatal;
331744e3
JD
1475 } else {
1476 ret_code = ret;
d88aee68
DG
1477 goto end_msg_sessiond;
1478 }
d88aee68
DG
1479 }
1480 case LTTNG_CONSUMER_SETUP_METADATA:
1481 {
1482 int ret;
1483
1484 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1485 if (ret) {
1486 ret_code = ret;
1487 }
1488 goto end_msg_sessiond;
ffe60014 1489 }
6dc3064a
DG
1490 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1491 {
10a50311
JD
1492 if (msg.u.snapshot_channel.metadata) {
1493 ret = snapshot_metadata(msg.u.snapshot_channel.key,
1494 msg.u.snapshot_channel.pathname,
1495 msg.u.snapshot_channel.relayd_id,
1496 ctx);
1497 if (ret < 0) {
1498 ERR("Snapshot metadata failed");
e462382a 1499 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
10a50311
JD
1500 }
1501 } else {
1502 ret = snapshot_channel(msg.u.snapshot_channel.key,
1503 msg.u.snapshot_channel.pathname,
1504 msg.u.snapshot_channel.relayd_id,
d07ceecd 1505 msg.u.snapshot_channel.nb_packets_per_stream,
10a50311
JD
1506 ctx);
1507 if (ret < 0) {
1508 ERR("Snapshot channel failed");
e462382a 1509 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
10a50311
JD
1510 }
1511 }
1512
9ce5646a 1513 health_code_update();
6dc3064a
DG
1514 ret = consumer_send_status_msg(sock, ret_code);
1515 if (ret < 0) {
1516 /* Somehow, the session daemon is not responding anymore. */
1517 goto end_nosignal;
1518 }
9ce5646a 1519 health_code_update();
6dc3064a
DG
1520 break;
1521 }
3bd1e081
MD
1522 default:
1523 break;
1524 }
3f8e211f 1525
3bd1e081 1526end_nosignal:
b0b335c8 1527 rcu_read_unlock();
4cbc1a04 1528
9ce5646a
MD
1529 health_code_update();
1530
4cbc1a04
DG
1531 /*
1532 * Return 1 to indicate success since the 0 value can be a socket
1533 * shutdown during the recv() or send() call.
1534 */
1535 return 1;
ffe60014
DG
1536
1537end_msg_sessiond:
1538 /*
1539 * The returned value here is not useful since either way we'll return 1 to
1540 * the caller because the session daemon socket management is done
1541 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1542 */
489f70e9
MD
1543 ret = consumer_send_status_msg(sock, ret_code);
1544 if (ret < 0) {
1545 goto error_fatal;
1546 }
ffe60014 1547 rcu_read_unlock();
9ce5646a
MD
1548
1549 health_code_update();
1550
ffe60014
DG
1551 return 1;
1552end_channel_error:
1553 if (channel) {
1554 /*
1555 * Free channel here since no one has a reference to it. We don't
1556 * free after that because a stream can store this pointer.
1557 */
1558 destroy_channel(channel);
1559 }
1560 /* We have to send a status channel message indicating an error. */
1561 ret = consumer_send_status_channel(sock, NULL);
1562 if (ret < 0) {
1563 /* Stop everything if session daemon can not be notified. */
1564 goto error_fatal;
1565 }
1566 rcu_read_unlock();
9ce5646a
MD
1567
1568 health_code_update();
1569
ffe60014
DG
1570 return 1;
1571error_fatal:
1572 rcu_read_unlock();
1573 /* This will issue a consumer stop. */
1574 return -1;
3bd1e081
MD
1575}
1576
ffe60014
DG
1577/*
1578 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1579 * compiled out, we isolate it in this library.
1580 */
1581int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
1582 unsigned long *off)
3bd1e081 1583{
ffe60014
DG
1584 assert(stream);
1585 assert(stream->ustream);
b5c5fc29 1586
ffe60014 1587 return ustctl_get_mmap_read_offset(stream->ustream, off);
3bd1e081
MD
1588}
1589
ffe60014
DG
1590/*
1591 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1592 * compiled out, we isolate it in this library.
1593 */
1594void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
d056b477 1595{
ffe60014
DG
1596 assert(stream);
1597 assert(stream->ustream);
1598
1599 return ustctl_get_mmap_base(stream->ustream);
d056b477
MD
1600}
1601
ffe60014
DG
1602/*
1603 * Take a snapshot for a specific fd
1604 *
1605 * Returns 0 on success, < 0 on error
1606 */
1607int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1608{
ffe60014
DG
1609 assert(stream);
1610 assert(stream->ustream);
1611
1612 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
1613}
1614
ffe60014
DG
1615/*
1616 * Get the produced position
1617 *
1618 * Returns 0 on success, < 0 on error
1619 */
1620int lttng_ustconsumer_get_produced_snapshot(
1621 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 1622{
ffe60014
DG
1623 assert(stream);
1624 assert(stream->ustream);
1625 assert(pos);
7a57cf92 1626
ffe60014
DG
1627 return ustctl_snapshot_get_produced(stream->ustream, pos);
1628}
7a57cf92 1629
10a50311
JD
1630/*
1631 * Get the consumed position
1632 *
1633 * Returns 0 on success, < 0 on error
1634 */
1635int lttng_ustconsumer_get_consumed_snapshot(
1636 struct lttng_consumer_stream *stream, unsigned long *pos)
1637{
1638 assert(stream);
1639 assert(stream->ustream);
1640 assert(pos);
1641
1642 return ustctl_snapshot_get_consumed(stream->ustream, pos);
1643}
1644
84a182ce
DG
1645void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
1646 int producer)
1647{
1648 assert(stream);
1649 assert(stream->ustream);
1650
1651 ustctl_flush_buffer(stream->ustream, producer);
1652}
1653
1654int lttng_ustconsumer_get_current_timestamp(
1655 struct lttng_consumer_stream *stream, uint64_t *ts)
1656{
1657 assert(stream);
1658 assert(stream->ustream);
1659 assert(ts);
1660
1661 return ustctl_get_current_timestamp(stream->ustream, ts);
1662}
1663
ffe60014
DG
1664/*
1665 * Called when the stream signal the consumer that it has hang up.
1666 */
1667void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
1668{
1669 assert(stream);
1670 assert(stream->ustream);
2c1dd183 1671
ffe60014
DG
1672 ustctl_flush_buffer(stream->ustream, 0);
1673 stream->hangup_flush_done = 1;
1674}
ee77a7b0 1675
ffe60014
DG
1676void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
1677{
1678 assert(chan);
1679 assert(chan->uchan);
e316aad5 1680
ea88ca2a
MD
1681 if (chan->switch_timer_enabled == 1) {
1682 consumer_timer_switch_stop(chan);
1683 }
1684 consumer_metadata_cache_destroy(chan);
ffe60014 1685 ustctl_destroy_channel(chan->uchan);
3d071855
MD
1686 /* Try to rmdir all directories under shm_path root. */
1687 if (chan->root_shm_path[0]) {
1688 (void) utils_recursive_rmdir(chan->root_shm_path);
1689 }
3bd1e081
MD
1690}
1691
1692void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
1693{
ffe60014
DG
1694 assert(stream);
1695 assert(stream->ustream);
d41f73b7 1696
ea88ca2a
MD
1697 if (stream->chan->switch_timer_enabled == 1) {
1698 consumer_timer_switch_stop(stream->chan);
1699 }
ffe60014
DG
1700 ustctl_destroy_stream(stream->ustream);
1701}
d41f73b7 1702
6d574024
DG
1703int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
1704{
1705 assert(stream);
1706 assert(stream->ustream);
1707
1708 return ustctl_stream_get_wakeup_fd(stream->ustream);
1709}
1710
1711int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
1712{
1713 assert(stream);
1714 assert(stream->ustream);
1715
1716 return ustctl_stream_close_wakeup_fd(stream->ustream);
1717}
1718
309167d2
JD
1719/*
1720 * Populate index values of a UST stream. Values are set in big endian order.
1721 *
1722 * Return 0 on success or else a negative value.
1723 */
50adc264 1724static int get_index_values(struct ctf_packet_index *index,
309167d2
JD
1725 struct ustctl_consumer_stream *ustream)
1726{
1727 int ret;
1728
1729 ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
1730 if (ret < 0) {
1731 PERROR("ustctl_get_timestamp_begin");
1732 goto error;
1733 }
1734 index->timestamp_begin = htobe64(index->timestamp_begin);
1735
1736 ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
1737 if (ret < 0) {
1738 PERROR("ustctl_get_timestamp_end");
1739 goto error;
1740 }
1741 index->timestamp_end = htobe64(index->timestamp_end);
1742
1743 ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
1744 if (ret < 0) {
1745 PERROR("ustctl_get_events_discarded");
1746 goto error;
1747 }
1748 index->events_discarded = htobe64(index->events_discarded);
1749
1750 ret = ustctl_get_content_size(ustream, &index->content_size);
1751 if (ret < 0) {
1752 PERROR("ustctl_get_content_size");
1753 goto error;
1754 }
1755 index->content_size = htobe64(index->content_size);
1756
1757 ret = ustctl_get_packet_size(ustream, &index->packet_size);
1758 if (ret < 0) {
1759 PERROR("ustctl_get_packet_size");
1760 goto error;
1761 }
1762 index->packet_size = htobe64(index->packet_size);
1763
1764 ret = ustctl_get_stream_id(ustream, &index->stream_id);
1765 if (ret < 0) {
1766 PERROR("ustctl_get_stream_id");
1767 goto error;
1768 }
1769 index->stream_id = htobe64(index->stream_id);
1770
1771error:
1772 return ret;
1773}
1774
94d49140
JD
1775/*
1776 * Write up to one packet from the metadata cache to the channel.
1777 *
1778 * Returns the number of bytes pushed in the cache, or a negative value
1779 * on error.
1780 */
1781static
1782int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
1783{
1784 ssize_t write_len;
1785 int ret;
1786
1787 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
1788 if (stream->chan->metadata_cache->contiguous
1789 == stream->ust_metadata_pushed) {
1790 ret = 0;
1791 goto end;
1792 }
1793
1794 write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
1795 &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
1796 stream->chan->metadata_cache->contiguous
1797 - stream->ust_metadata_pushed);
1798 assert(write_len != 0);
1799 if (write_len < 0) {
1800 ERR("Writing one metadata packet");
1801 ret = -1;
1802 goto end;
1803 }
1804 stream->ust_metadata_pushed += write_len;
1805
1806 assert(stream->chan->metadata_cache->contiguous >=
1807 stream->ust_metadata_pushed);
1808 ret = write_len;
1809
1810end:
1811 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
1812 return ret;
1813}
1814
309167d2 1815
94d49140
JD
1816/*
1817 * Sync metadata meaning request them to the session daemon and snapshot to the
1818 * metadata thread can consumer them.
1819 *
1820 * Metadata stream lock MUST be acquired.
1821 *
1822 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
1823 * is empty or a negative value on error.
1824 */
1825int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
1826 struct lttng_consumer_stream *metadata)
1827{
1828 int ret;
1829 int retry = 0;
1830
1831 assert(ctx);
1832 assert(metadata);
1833
1834 /*
1835 * Request metadata from the sessiond, but don't wait for the flush
1836 * because we locked the metadata thread.
1837 */
1838 ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
1839 if (ret < 0) {
1840 goto end;
1841 }
1842
1843 ret = commit_one_metadata_packet(metadata);
1844 if (ret <= 0) {
1845 goto end;
1846 } else if (ret > 0) {
1847 retry = 1;
1848 }
1849
1850 ustctl_flush_buffer(metadata->ustream, 1);
1851 ret = ustctl_snapshot(metadata->ustream);
1852 if (ret < 0) {
1853 if (errno != EAGAIN) {
1854 ERR("Sync metadata, taking UST snapshot");
1855 goto end;
1856 }
1857 DBG("No new metadata when syncing them.");
1858 /* No new metadata, exit. */
1859 ret = ENODATA;
1860 goto end;
1861 }
1862
1863 /*
1864 * After this flush, we still need to extract metadata.
1865 */
1866 if (retry) {
1867 ret = EAGAIN;
1868 }
1869
1870end:
1871 return ret;
1872}
1873
02b3d176
DG
1874/*
1875 * Return 0 on success else a negative value.
1876 */
1877static int notify_if_more_data(struct lttng_consumer_stream *stream,
1878 struct lttng_consumer_local_data *ctx)
1879{
1880 int ret;
1881 struct ustctl_consumer_stream *ustream;
1882
1883 assert(stream);
1884 assert(ctx);
1885
1886 ustream = stream->ustream;
1887
1888 /*
1889 * First, we are going to check if there is a new subbuffer available
1890 * before reading the stream wait_fd.
1891 */
1892 /* Get the next subbuffer */
1893 ret = ustctl_get_next_subbuf(ustream);
1894 if (ret) {
1895 /* No more data found, flag the stream. */
1896 stream->has_data = 0;
1897 ret = 0;
1898 goto end;
1899 }
1900
5420e5db 1901 ret = ustctl_put_subbuf(ustream);
02b3d176
DG
1902 assert(!ret);
1903
1904 /* This stream still has data. Flag it and wake up the data thread. */
1905 stream->has_data = 1;
1906
1907 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
1908 ssize_t writelen;
1909
1910 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
1911 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
1912 ret = writelen;
1913 goto end;
1914 }
1915
1916 /* The wake up pipe has been notified. */
1917 ctx->has_wakeup = 1;
1918 }
1919 ret = 0;
1920
1921end:
1922 return ret;
1923}
1924
94d49140
JD
1925/*
1926 * Read subbuffer from the given stream.
1927 *
1928 * Stream lock MUST be acquired.
1929 *
1930 * Return 0 on success else a negative value.
1931 */
d41f73b7
MD
1932int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1933 struct lttng_consumer_local_data *ctx)
1934{
1d4dfdef 1935 unsigned long len, subbuf_size, padding;
1c20f0e2 1936 int err, write_index = 1;
d41f73b7 1937 long ret = 0;
ffe60014 1938 struct ustctl_consumer_stream *ustream;
50adc264 1939 struct ctf_packet_index index;
ffe60014
DG
1940
1941 assert(stream);
1942 assert(stream->ustream);
1943 assert(ctx);
d41f73b7 1944
3eb914c0 1945 DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
ffe60014
DG
1946 stream->name);
1947
1948 /* Ease our life for what's next. */
1949 ustream = stream->ustream;
d41f73b7 1950
6cd525e8 1951 /*
02b3d176
DG
1952 * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
1953 * error if we cannot read this one byte (read returns 0), or if the error
1954 * is EAGAIN or EWOULDBLOCK.
1955 *
1956 * This is only done when the stream is monitored by a thread, before the
1957 * flush is done after a hangup and if the stream is not flagged with data
1958 * since there might be nothing to consume in the wait fd but still have
1959 * data available flagged by the consumer wake up pipe.
6cd525e8 1960 */
02b3d176
DG
1961 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
1962 char dummy;
c617c0c6
MD
1963 ssize_t readlen;
1964
6cd525e8
MD
1965 readlen = lttng_read(stream->wait_fd, &dummy, 1);
1966 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
effcf122
MD
1967 ret = readlen;
1968 goto end;
1969 }
d41f73b7
MD
1970 }
1971
04ef1097 1972retry:
d41f73b7 1973 /* Get the next subbuffer */
ffe60014 1974 err = ustctl_get_next_subbuf(ustream);
d41f73b7 1975 if (err != 0) {
04ef1097
MD
1976 /*
1977 * Populate metadata info if the existing info has
1978 * already been read.
1979 */
1980 if (stream->metadata_flag) {
94d49140
JD
1981 ret = commit_one_metadata_packet(stream);
1982 if (ret <= 0) {
04ef1097
MD
1983 goto end;
1984 }
04ef1097
MD
1985 ustctl_flush_buffer(stream->ustream, 1);
1986 goto retry;
1987 }
1988
1d4dfdef 1989 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
1990 /*
1991 * This is a debug message even for single-threaded consumer,
1992 * because poll() have more relaxed criterions than get subbuf,
1993 * so get_subbuf may fail for short race windows where poll()
1994 * would issue wakeups.
1995 */
1996 DBG("Reserving sub buffer failed (everything is normal, "
ffe60014 1997 "it is due to concurrency) [ret: %d]", err);
d41f73b7
MD
1998 goto end;
1999 }
ffe60014 2000 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
309167d2 2001
1c20f0e2 2002 if (!stream->metadata_flag) {
309167d2
JD
2003 index.offset = htobe64(stream->out_fd_offset);
2004 ret = get_index_values(&index, ustream);
2005 if (ret < 0) {
2006 goto end;
2007 }
1c20f0e2
JD
2008 } else {
2009 write_index = 0;
309167d2
JD
2010 }
2011
1d4dfdef 2012 /* Get the full padded subbuffer size */
ffe60014 2013 err = ustctl_get_padded_subbuf_size(ustream, &len);
effcf122 2014 assert(err == 0);
1d4dfdef
DG
2015
2016 /* Get subbuffer data size (without padding) */
ffe60014 2017 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
1d4dfdef
DG
2018 assert(err == 0);
2019
2020 /* Make sure we don't get a subbuffer size bigger than the padded */
2021 assert(len >= subbuf_size);
2022
2023 padding = len - subbuf_size;
d41f73b7 2024 /* write the subbuffer to the tracefile */
309167d2 2025 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
91dfef6e
DG
2026 /*
2027 * The mmap operation should write subbuf_size amount of data when network
2028 * streaming or the full padding (len) size when we are _not_ streaming.
2029 */
d88aee68
DG
2030 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
2031 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
d41f73b7 2032 /*
91dfef6e 2033 * Display the error but continue processing to try to release the
c5c45efa
DG
2034 * subbuffer. This is a DBG statement since any unexpected kill or
2035 * signal, the application gets unregistered, relayd gets closed or
2036 * anything that affects the buffer lifetime will trigger this error.
2037 * So, for the sake of the user, don't print this error since it can
2038 * happen and it is OK with the code flow.
d41f73b7 2039 */
c5c45efa 2040 DBG("Error writing to tracefile "
8fd623e0 2041 "(ret: %ld != len: %lu != subbuf_size: %lu)",
91dfef6e 2042 ret, len, subbuf_size);
309167d2 2043 write_index = 0;
d41f73b7 2044 }
ffe60014 2045 err = ustctl_put_next_subbuf(ustream);
effcf122 2046 assert(err == 0);
331744e3 2047
02b3d176
DG
2048 /*
2049 * This will consumer the byte on the wait_fd if and only if there is not
2050 * next subbuffer to be acquired.
2051 */
2052 if (!stream->metadata_flag) {
2053 ret = notify_if_more_data(stream, ctx);
2054 if (ret < 0) {
2055 goto end;
2056 }
2057 }
2058
309167d2 2059 /* Write index if needed. */
1c20f0e2
JD
2060 if (!write_index) {
2061 goto end;
2062 }
2063
94d49140
JD
2064 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
2065 /*
2066 * In live, block until all the metadata is sent.
2067 */
2068 err = consumer_stream_sync_metadata(ctx, stream->session_id);
2069 if (err < 0) {
2070 goto end;
2071 }
2072 }
2073
1c20f0e2
JD
2074 assert(!stream->metadata_flag);
2075 err = consumer_stream_write_index(stream, &index);
2076 if (err < 0) {
2077 goto end;
309167d2
JD
2078 }
2079
d41f73b7
MD
2080end:
2081 return ret;
2082}
2083
ffe60014
DG
2084/*
2085 * Called when a stream is created.
fe4477ee
JD
2086 *
2087 * Return 0 on success or else a negative value.
ffe60014 2088 */
d41f73b7
MD
2089int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
2090{
fe4477ee
JD
2091 int ret;
2092
10a50311
JD
2093 assert(stream);
2094
fe4477ee 2095 /* Don't create anything if this is set for streaming. */
10a50311 2096 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
fe4477ee
JD
2097 ret = utils_create_stream_file(stream->chan->pathname, stream->name,
2098 stream->chan->tracefile_size, stream->tracefile_count_current,
309167d2 2099 stream->uid, stream->gid, NULL);
fe4477ee
JD
2100 if (ret < 0) {
2101 goto error;
2102 }
2103 stream->out_fd = ret;
2104 stream->tracefile_size_current = 0;
309167d2
JD
2105
2106 if (!stream->metadata_flag) {
2107 ret = index_create_file(stream->chan->pathname,
2108 stream->name, stream->uid, stream->gid,
2109 stream->chan->tracefile_size,
2110 stream->tracefile_count_current);
2111 if (ret < 0) {
2112 goto error;
2113 }
2114 stream->index_fd = ret;
2115 }
fe4477ee
JD
2116 }
2117 ret = 0;
2118
2119error:
2120 return ret;
d41f73b7 2121}
ca22feea
DG
2122
2123/*
2124 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
2125 * stream. Consumer data lock MUST be acquired before calling this function
2126 * and the stream lock.
ca22feea 2127 *
6d805429 2128 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
2129 * data is available for trace viewer reading.
2130 */
6d805429 2131int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
2132{
2133 int ret;
2134
2135 assert(stream);
ffe60014 2136 assert(stream->ustream);
ca22feea 2137
6d805429 2138 DBG("UST consumer checking data pending");
c8f59ee5 2139
ca6b395f
MD
2140 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
2141 ret = 0;
2142 goto end;
2143 }
2144
04ef1097 2145 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
e6ee4eab
DG
2146 uint64_t contiguous, pushed;
2147
2148 /* Ease our life a bit. */
2149 contiguous = stream->chan->metadata_cache->contiguous;
2150 pushed = stream->ust_metadata_pushed;
2151
04ef1097
MD
2152 /*
2153 * We can simply check whether all contiguously available data
2154 * has been pushed to the ring buffer, since the push operation
2155 * is performed within get_next_subbuf(), and because both
2156 * get_next_subbuf() and put_next_subbuf() are issued atomically
2157 * thanks to the stream lock within
2158 * lttng_ustconsumer_read_subbuffer(). This basically means that
2159 * whetnever ust_metadata_pushed is incremented, the associated
2160 * metadata has been consumed from the metadata stream.
2161 */
2162 DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
e6ee4eab 2163 contiguous, pushed);
aa01b94c 2164 assert(((int64_t) (contiguous - pushed)) >= 0);
e6ee4eab 2165 if ((contiguous != pushed) ||
6acdf328 2166 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
04ef1097
MD
2167 ret = 1; /* Data is pending */
2168 goto end;
2169 }
2170 } else {
2171 ret = ustctl_get_next_subbuf(stream->ustream);
2172 if (ret == 0) {
2173 /*
2174 * There is still data so let's put back this
2175 * subbuffer.
2176 */
2177 ret = ustctl_put_subbuf(stream->ustream);
2178 assert(ret == 0);
2179 ret = 1; /* Data is pending */
2180 goto end;
2181 }
ca22feea
DG
2182 }
2183
6d805429
DG
2184 /* Data is NOT pending so ready to be read. */
2185 ret = 0;
ca22feea 2186
6efae65e
DG
2187end:
2188 return ret;
ca22feea 2189}
d88aee68 2190
6d574024
DG
2191/*
2192 * Stop a given metadata channel timer if enabled and close the wait fd which
2193 * is the poll pipe of the metadata stream.
2194 *
2195 * This MUST be called with the metadata channel acquired.
2196 */
2197void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
2198{
2199 int ret;
2200
2201 assert(metadata);
2202 assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
2203
2204 DBG("Closing metadata channel key %" PRIu64, metadata->key);
2205
2206 if (metadata->switch_timer_enabled == 1) {
2207 consumer_timer_switch_stop(metadata);
2208 }
2209
2210 if (!metadata->metadata_stream) {
2211 goto end;
2212 }
2213
2214 /*
2215 * Closing write side so the thread monitoring the stream wakes up if any
2216 * and clean the metadata stream.
2217 */
2218 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
2219 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
2220 if (ret < 0) {
2221 PERROR("closing metadata pipe write side");
2222 }
2223 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
2224 }
2225
2226end:
2227 return;
2228}
2229
d88aee68
DG
2230/*
2231 * Close every metadata stream wait fd of the metadata hash table. This
2232 * function MUST be used very carefully so not to run into a race between the
2233 * metadata thread handling streams and this function closing their wait fd.
2234 *
2235 * For UST, this is used when the session daemon hangs up. Its the metadata
2236 * producer so calling this is safe because we are assured that no state change
2237 * can occur in the metadata thread for the streams in the hash table.
2238 */
6d574024 2239void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
d88aee68 2240{
d88aee68
DG
2241 struct lttng_ht_iter iter;
2242 struct lttng_consumer_stream *stream;
2243
2244 assert(metadata_ht);
2245 assert(metadata_ht->ht);
2246
2247 DBG("UST consumer closing all metadata streams");
2248
2249 rcu_read_lock();
2250 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
2251 node.node) {
9ce5646a
MD
2252
2253 health_code_update();
2254
be2b50c7 2255 pthread_mutex_lock(&stream->chan->lock);
6d574024 2256 lttng_ustconsumer_close_metadata(stream->chan);
be2b50c7
DG
2257 pthread_mutex_unlock(&stream->chan->lock);
2258
d88aee68
DG
2259 }
2260 rcu_read_unlock();
2261}
d8ef542d
MD
2262
2263void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
2264{
2265 int ret;
2266
2267 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
2268 if (ret < 0) {
2269 ERR("Unable to close wakeup fd");
2270 }
2271}
331744e3 2272
f666ae70
MD
2273/*
2274 * Please refer to consumer-timer.c before adding any lock within this
2275 * function or any of its callees. Timers have a very strict locking
2276 * semantic with respect to teardown. Failure to respect this semantic
2277 * introduces deadlocks.
2278 */
331744e3 2279int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
94d49140 2280 struct lttng_consumer_channel *channel, int timer, int wait)
331744e3
JD
2281{
2282 struct lttcomm_metadata_request_msg request;
2283 struct lttcomm_consumer_msg msg;
0c759fc9 2284 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
331744e3
JD
2285 uint64_t len, key, offset;
2286 int ret;
2287
2288 assert(channel);
2289 assert(channel->metadata_cache);
2290
53efb85a
MD
2291 memset(&request, 0, sizeof(request));
2292
331744e3
JD
2293 /* send the metadata request to sessiond */
2294 switch (consumer_data.type) {
2295 case LTTNG_CONSUMER64_UST:
2296 request.bits_per_long = 64;
2297 break;
2298 case LTTNG_CONSUMER32_UST:
2299 request.bits_per_long = 32;
2300 break;
2301 default:
2302 request.bits_per_long = 0;
2303 break;
2304 }
2305
2306 request.session_id = channel->session_id;
1950109e 2307 request.session_id_per_pid = channel->session_id_per_pid;
567eb353
DG
2308 /*
2309 * Request the application UID here so the metadata of that application can
2310 * be sent back. The channel UID corresponds to the user UID of the session
2311 * used for the rights on the stream file(s).
2312 */
2313 request.uid = channel->ust_app_uid;
331744e3 2314 request.key = channel->key;
567eb353 2315
1950109e 2316 DBG("Sending metadata request to sessiond, session id %" PRIu64
567eb353
DG
2317 ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64,
2318 request.session_id, request.session_id_per_pid, request.uid,
2319 request.key);
331744e3 2320
75d83e50 2321 pthread_mutex_lock(&ctx->metadata_socket_lock);
9ce5646a
MD
2322
2323 health_code_update();
2324
331744e3
JD
2325 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
2326 sizeof(request));
2327 if (ret < 0) {
2328 ERR("Asking metadata to sessiond");
2329 goto end;
2330 }
2331
9ce5646a
MD
2332 health_code_update();
2333
331744e3
JD
2334 /* Receive the metadata from sessiond */
2335 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
2336 sizeof(msg));
2337 if (ret != sizeof(msg)) {
8fd623e0 2338 DBG("Consumer received unexpected message size %d (expects %zu)",
331744e3
JD
2339 ret, sizeof(msg));
2340 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
2341 /*
2342 * The ret value might 0 meaning an orderly shutdown but this is ok
2343 * since the caller handles this.
2344 */
2345 goto end;
2346 }
2347
9ce5646a
MD
2348 health_code_update();
2349
331744e3
JD
2350 if (msg.cmd_type == LTTNG_ERR_UND) {
2351 /* No registry found */
2352 (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
2353 ret_code);
2354 ret = 0;
2355 goto end;
2356 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
2357 ERR("Unexpected cmd_type received %d", msg.cmd_type);
2358 ret = -1;
2359 goto end;
2360 }
2361
2362 len = msg.u.push_metadata.len;
2363 key = msg.u.push_metadata.key;
2364 offset = msg.u.push_metadata.target_offset;
2365
2366 assert(key == channel->key);
2367 if (len == 0) {
2368 DBG("No new metadata to receive for key %" PRIu64, key);
2369 }
2370
9ce5646a
MD
2371 health_code_update();
2372
331744e3
JD
2373 /* Tell session daemon we are ready to receive the metadata. */
2374 ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
0c759fc9 2375 LTTCOMM_CONSUMERD_SUCCESS);
331744e3
JD
2376 if (ret < 0 || len == 0) {
2377 /*
2378 * Somehow, the session daemon is not responding anymore or there is
2379 * nothing to receive.
2380 */
2381 goto end;
2382 }
2383
9ce5646a
MD
2384 health_code_update();
2385
1eb682be 2386 ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
94d49140 2387 key, offset, len, channel, timer, wait);
1eb682be 2388 if (ret >= 0) {
f2a444f1
DG
2389 /*
2390 * Only send the status msg if the sessiond is alive meaning a positive
2391 * ret code.
2392 */
1eb682be 2393 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
f2a444f1 2394 }
331744e3
JD
2395 ret = 0;
2396
2397end:
9ce5646a
MD
2398 health_code_update();
2399
75d83e50 2400 pthread_mutex_unlock(&ctx->metadata_socket_lock);
331744e3
JD
2401 return ret;
2402}
70190e1c
DG
2403
2404/*
2405 * Return the ustctl call for the get stream id.
2406 */
2407int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
2408 uint64_t *stream_id)
2409{
2410 assert(stream);
2411 assert(stream_id);
2412
2413 return ustctl_get_stream_id(stream->ustream, stream_id);
2414}
This page took 0.279399 seconds and 4 git commands to generate.