Run clang-format on the whole tree
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
28ab034a
JG
11#include "ust-consumer.hpp"
12
13#include <common/common.hpp>
14#include <common/compat/endian.hpp>
15#include <common/compat/fcntl.hpp>
16#include <common/consumer/consumer-metadata-cache.hpp>
17#include <common/consumer/consumer-stream.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/index/index.hpp>
21#include <common/optional.hpp>
22#include <common/relayd/relayd.hpp>
23#include <common/sessiond-comm/sessiond-comm.hpp>
24#include <common/shm.hpp>
25#include <common/utils.hpp>
26
f02e1e8a 27#include <lttng/ust-ctl.h>
881fc67f 28#include <lttng/ust-sigbus.h>
28ab034a
JG
29
30#include <bin/lttng-consumerd/health-consumerd.hpp>
31#include <inttypes.h>
3bd1e081
MD
32#include <poll.h>
33#include <pthread.h>
28ab034a
JG
34#include <signal.h>
35#include <stdbool.h>
36#include <stdint.h>
3bd1e081
MD
37#include <stdlib.h>
38#include <string.h>
39#include <sys/mman.h>
40#include <sys/socket.h>
dbb5dfe6 41#include <sys/stat.h>
3bd1e081
MD
42#include <sys/types.h>
43#include <unistd.h>
ffe60014 44#include <urcu/list.h>
0857097f 45
28ab034a 46#define INT_MAX_STR_LEN 12 /* includes \0 */
4628484a 47
fa29bfbf 48extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 49extern int consumer_poll_timeout;
3bd1e081 50
4bd69c5f 51LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
881fc67f 52
3bd1e081 53/*
ffe60014 54 * Add channel to internal consumer state.
3bd1e081 55 *
ffe60014 56 * Returns 0 on success or else a negative value.
3bd1e081 57 */
ffe60014 58static int add_channel(struct lttng_consumer_channel *channel,
28ab034a 59 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
60{
61 int ret = 0;
62
a0377dfe
FD
63 LTTNG_ASSERT(channel);
64 LTTNG_ASSERT(ctx);
ffe60014
DG
65
66 if (ctx->on_recv_channel != NULL) {
67 ret = ctx->on_recv_channel(channel);
68 if (ret == 0) {
d8ef542d 69 ret = consumer_add_channel(channel, ctx);
ffe60014
DG
70 } else if (ret < 0) {
71 /* Most likely an ENOMEM. */
72 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
73 goto error;
74 }
75 } else {
d8ef542d 76 ret = consumer_add_channel(channel, ctx);
3bd1e081
MD
77 }
78
d88aee68 79 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
80
81error:
3bd1e081
MD
82 return ret;
83}
84
ffe60014
DG
85/*
86 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
87 * error value if applicable is set in it else it is kept untouched.
3bd1e081 88 *
ffe60014 89 * Return NULL on error else the newly allocated stream object.
3bd1e081 90 */
28ab034a
JG
91static struct lttng_consumer_stream *allocate_stream(int cpu,
92 int key,
93 struct lttng_consumer_channel *channel,
94 struct lttng_consumer_local_data *ctx,
95 int *_alloc_ret)
ffe60014
DG
96{
97 int alloc_ret;
98 struct lttng_consumer_stream *stream = NULL;
99
a0377dfe
FD
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ctx);
ffe60014 102
28ab034a
JG
103 stream = consumer_stream_create(channel,
104 channel->key,
105 key,
106 channel->name,
107 channel->relayd_id,
108 channel->session_id,
109 channel->trace_chunk,
110 cpu,
111 &alloc_ret,
112 channel->type,
113 channel->monitor);
ffe60014
DG
114 if (stream == NULL) {
115 switch (alloc_ret) {
116 case -ENOENT:
117 /*
118 * We could not find the channel. Can happen if cpu hotplug
119 * happens while tearing down.
120 */
121 DBG3("Could not find channel");
122 break;
123 case -ENOMEM:
124 case -EINVAL:
125 default:
126 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
127 break;
128 }
129 goto error;
130 }
131
d9a2e16e 132 consumer_stream_update_channel_attributes(stream, channel);
ffe60014
DG
133
134error:
135 if (_alloc_ret) {
136 *_alloc_ret = alloc_ret;
137 }
138 return stream;
139}
140
141/*
142 * Send the given stream pointer to the corresponding thread.
143 *
144 * Returns 0 on success else a negative value.
145 */
146static int send_stream_to_thread(struct lttng_consumer_stream *stream,
28ab034a 147 struct lttng_consumer_local_data *ctx)
ffe60014 148{
dae10966
DG
149 int ret;
150 struct lttng_pipe *stream_pipe;
ffe60014
DG
151
152 /* Get the right pipe where the stream will be sent. */
153 if (stream->metadata_flag) {
66d583dc 154 consumer_add_metadata_stream(stream);
dae10966 155 stream_pipe = ctx->consumer_metadata_pipe;
ffe60014 156 } else {
66d583dc 157 consumer_add_data_stream(stream);
dae10966 158 stream_pipe = ctx->consumer_data_pipe;
ffe60014
DG
159 }
160
5ab66908
MD
161 /*
162 * From this point on, the stream's ownership has been moved away from
a8086cf4
JR
163 * the channel and it becomes globally visible. Hence, remove it from
164 * the local stream list to prevent the stream from being both local and
165 * global.
5ab66908
MD
166 */
167 stream->globally_visible = 1;
5c5e3d71 168 cds_list_del_init(&stream->send_node);
5ab66908 169
dae10966 170 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
ffe60014 171 if (ret < 0) {
dae10966 172 ERR("Consumer write %s stream to pipe %d",
28ab034a
JG
173 stream->metadata_flag ? "metadata" : "data",
174 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
175 if (stream->metadata_flag) {
176 consumer_del_stream_for_metadata(stream);
177 } else {
178 consumer_del_stream_for_data(stream);
179 }
a8086cf4 180 goto error;
ffe60014 181 }
a8086cf4 182
5ab66908 183error:
ffe60014
DG
184 return ret;
185}
186
28ab034a 187static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
4628484a 188{
28ab034a 189 char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
4628484a
MD
190 int ret;
191
192 strncpy(stream_shm_path, shm_path, PATH_MAX);
193 stream_shm_path[PATH_MAX - 1] = '\0';
45863397 194 ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu);
67f8cb8d
MD
195 if (ret < 0) {
196 PERROR("snprintf");
4628484a
MD
197 goto end;
198 }
28ab034a 199 strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1);
4628484a
MD
200 ret = 0;
201end:
202 return ret;
203}
204
d88aee68
DG
205/*
206 * Create streams for the given channel using liblttng-ust-ctl.
d2956687 207 * The channel lock must be acquired by the caller.
d88aee68
DG
208 *
209 * Return 0 on success else a negative value.
210 */
ffe60014 211static int create_ust_streams(struct lttng_consumer_channel *channel,
28ab034a 212 struct lttng_consumer_local_data *ctx)
ffe60014
DG
213{
214 int ret, cpu = 0;
b623cb6a 215 struct lttng_ust_ctl_consumer_stream *ustream;
ffe60014 216 struct lttng_consumer_stream *stream;
d2956687 217 pthread_mutex_t *current_stream_lock = NULL;
ffe60014 218
a0377dfe
FD
219 LTTNG_ASSERT(channel);
220 LTTNG_ASSERT(ctx);
ffe60014
DG
221
222 /*
223 * While a stream is available from ustctl. When NULL is returned, we've
224 * reached the end of the possible stream for the channel.
225 */
b623cb6a 226 while ((ustream = lttng_ust_ctl_create_stream(channel->uchan, cpu))) {
ffe60014 227 int wait_fd;
04ef1097 228 int ust_metadata_pipe[2];
ffe60014 229
9ce5646a
MD
230 health_code_update();
231
04ef1097
MD
232 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
233 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
234 if (ret < 0) {
235 ERR("Create ust metadata poll pipe");
236 goto error;
237 }
238 wait_fd = ust_metadata_pipe[0];
239 } else {
b623cb6a 240 wait_fd = lttng_ust_ctl_stream_get_wait_fd(ustream);
04ef1097 241 }
ffe60014
DG
242
243 /* Allocate consumer stream object. */
d2956687 244 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
ffe60014
DG
245 if (!stream) {
246 goto error_alloc;
247 }
248 stream->ustream = ustream;
249 /*
250 * Store it so we can save multiple function calls afterwards since
251 * this value is used heavily in the stream threads. This is UST
252 * specific so this is why it's done after allocation.
253 */
254 stream->wait_fd = wait_fd;
255
b31398bb
DG
256 /*
257 * Increment channel refcount since the channel reference has now been
258 * assigned in the allocation process above.
259 */
10a50311
JD
260 if (stream->chan->monitor) {
261 uatomic_inc(&stream->chan->refcount);
262 }
b31398bb 263
d2956687
JG
264 pthread_mutex_lock(&stream->lock);
265 current_stream_lock = &stream->lock;
ffe60014
DG
266 /*
267 * Order is important this is why a list is used. On error, the caller
268 * should clean this list.
269 */
270 cds_list_add_tail(&stream->send_node, &channel->streams.head);
271
28ab034a 272 ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size);
ffe60014 273 if (ret < 0) {
28ab034a 274 ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name);
ffe60014
DG
275 goto error;
276 }
277
278 /* Do actions once stream has been received. */
279 if (ctx->on_recv_stream) {
280 ret = ctx->on_recv_stream(stream);
281 if (ret < 0) {
282 goto error;
283 }
284 }
285
d88aee68 286 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
28ab034a
JG
287 stream->name,
288 stream->key,
289 stream->relayd_stream_id);
ffe60014
DG
290
291 /* Set next CPU stream. */
292 channel->streams.count = ++cpu;
d88aee68
DG
293
294 /* Keep stream reference when creating metadata. */
295 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
296 channel->metadata_stream = stream;
8de4f941
JG
297 if (channel->monitor) {
298 /* Set metadata poll pipe if we created one */
299 memcpy(stream->ust_metadata_poll_pipe,
28ab034a
JG
300 ust_metadata_pipe,
301 sizeof(ust_metadata_pipe));
8de4f941 302 }
d88aee68 303 }
d2956687
JG
304 pthread_mutex_unlock(&stream->lock);
305 current_stream_lock = NULL;
ffe60014
DG
306 }
307
308 return 0;
309
310error:
311error_alloc:
d2956687
JG
312 if (current_stream_lock) {
313 pthread_mutex_unlock(current_stream_lock);
314 }
ffe60014
DG
315 return ret;
316}
317
28ab034a
JG
318static int open_ust_stream_fd(struct lttng_consumer_channel *channel,
319 int cpu,
320 const struct lttng_credentials *session_credentials)
4628484a
MD
321{
322 char shm_path[PATH_MAX];
323 int ret;
324
325 if (!channel->shm_path[0]) {
b7fc068d 326 return shm_create_anonymous("ust-consumer");
4628484a
MD
327 }
328 ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
329 if (ret) {
330 goto error_shm_path;
331 }
332 return run_as_open(shm_path,
28ab034a
JG
333 O_RDWR | O_CREAT | O_EXCL,
334 S_IRUSR | S_IWUSR,
335 lttng_credentials_get_uid(session_credentials),
336 lttng_credentials_get_gid(session_credentials));
4628484a
MD
337
338error_shm_path:
339 return -1;
340}
341
ffe60014
DG
342/*
343 * Create an UST channel with the given attributes and send it to the session
344 * daemon using the ust ctl API.
345 *
346 * Return 0 on success or else a negative value.
347 */
4628484a 348static int create_ust_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
349 struct lttng_ust_ctl_consumer_channel_attr *attr,
350 struct lttng_ust_ctl_consumer_channel **ust_chanp)
ffe60014 351{
4628484a
MD
352 int ret, nr_stream_fds, i, j;
353 int *stream_fds;
b623cb6a 354 struct lttng_ust_ctl_consumer_channel *ust_channel;
ffe60014 355
a0377dfe
FD
356 LTTNG_ASSERT(channel);
357 LTTNG_ASSERT(attr);
358 LTTNG_ASSERT(ust_chanp);
359 LTTNG_ASSERT(channel->buffer_credentials.is_set);
ffe60014
DG
360
361 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
28ab034a
JG
362 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
363 "switch_timer_interval: %u, read_timer_interval: %u, "
364 "output: %d, type: %d",
365 attr->overwrite,
366 attr->subbuf_size,
367 attr->num_subbuf,
368 attr->switch_timer_interval,
369 attr->read_timer_interval,
370 attr->output,
371 attr->type);
ffe60014 372
4628484a
MD
373 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
374 nr_stream_fds = 1;
375 else
b623cb6a 376 nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel();
64803277 377 stream_fds = calloc<int>(nr_stream_fds);
4628484a
MD
378 if (!stream_fds) {
379 ret = -1;
380 goto error_alloc;
381 }
382 for (i = 0; i < nr_stream_fds; i++) {
28ab034a 383 stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value);
4628484a
MD
384 if (stream_fds[i] < 0) {
385 ret = -1;
386 goto error_open;
387 }
388 }
b623cb6a 389 ust_channel = lttng_ust_ctl_create_channel(attr, stream_fds, nr_stream_fds);
4628484a 390 if (!ust_channel) {
ffe60014
DG
391 ret = -1;
392 goto error_create;
393 }
4628484a
MD
394 channel->nr_stream_fds = nr_stream_fds;
395 channel->stream_fds = stream_fds;
396 *ust_chanp = ust_channel;
ffe60014
DG
397
398 return 0;
399
400error_create:
4628484a
MD
401error_open:
402 for (j = i - 1; j >= 0; j--) {
403 int closeret;
404
405 closeret = close(stream_fds[j]);
406 if (closeret) {
407 PERROR("close");
408 }
409 if (channel->shm_path[0]) {
410 char shm_path[PATH_MAX];
411
28ab034a 412 closeret = get_stream_shm_path(shm_path, channel->shm_path, j);
4628484a
MD
413 if (closeret) {
414 ERR("Cannot get stream shm path");
415 }
416 closeret = run_as_unlink(shm_path,
28ab034a
JG
417 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
418 channel->buffer_credentials)),
419 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
420 channel->buffer_credentials)));
4628484a 421 if (closeret) {
4628484a
MD
422 PERROR("unlink %s", shm_path);
423 }
424 }
425 }
426 /* Try to rmdir all directories under shm_path root. */
427 if (channel->root_shm_path[0]) {
602766ec 428 (void) run_as_rmdir_recursive(channel->root_shm_path,
28ab034a
JG
429 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
430 channel->buffer_credentials)),
431 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
432 channel->buffer_credentials)),
433 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
4628484a
MD
434 }
435 free(stream_fds);
436error_alloc:
ffe60014
DG
437 return ret;
438}
439
d88aee68
DG
440/*
441 * Send a single given stream to the session daemon using the sock.
442 *
443 * Return 0 on success else a negative value.
444 */
ffe60014
DG
445static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
446{
447 int ret;
448
a0377dfe
FD
449 LTTNG_ASSERT(stream);
450 LTTNG_ASSERT(sock >= 0);
ffe60014 451
3eb914c0 452 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
453
454 /* Send stream to session daemon. */
b623cb6a 455 ret = lttng_ust_ctl_send_stream_to_sessiond(sock, stream->ustream);
ffe60014
DG
456 if (ret < 0) {
457 goto error;
458 }
459
ffe60014
DG
460error:
461 return ret;
462}
463
464/*
a3a86f35 465 * Send channel to sessiond and relayd if applicable.
ffe60014 466 *
d88aee68 467 * Return 0 on success or else a negative value.
ffe60014 468 */
a3a86f35 469static int send_channel_to_sessiond_and_relayd(int sock,
28ab034a
JG
470 struct lttng_consumer_channel *channel,
471 struct lttng_consumer_local_data *ctx,
472 int *relayd_error)
ffe60014 473{
0c759fc9 474 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014 475 struct lttng_consumer_stream *stream;
a4baae1b 476 uint64_t net_seq_idx = -1ULL;
ffe60014 477
a0377dfe
FD
478 LTTNG_ASSERT(channel);
479 LTTNG_ASSERT(ctx);
480 LTTNG_ASSERT(sock >= 0);
ffe60014
DG
481
482 DBG("UST consumer sending channel %s to sessiond", channel->name);
483
62285ea4 484 if (channel->relayd_id != (uint64_t) -1ULL) {
28ab034a 485 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
9ce5646a
MD
486 health_code_update();
487
62285ea4 488 /* Try to send the stream to the relayd if one is available. */
a3a86f35 489 DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
28ab034a
JG
490 stream->key,
491 channel->name);
62285ea4
DG
492 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
493 if (ret < 0) {
494 /*
495 * Flag that the relayd was the problem here probably due to a
496 * communicaton error on the socket.
497 */
498 if (relayd_error) {
499 *relayd_error = 1;
500 }
725d28b2 501 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
ffe60014 502 }
a4baae1b
JD
503 if (net_seq_idx == -1ULL) {
504 net_seq_idx = stream->net_seq_idx;
505 }
506 }
f2a444f1 507 }
ffe60014 508
f2a444f1
DG
509 /* Inform sessiond that we are about to send channel and streams. */
510 ret = consumer_send_status_msg(sock, ret_code);
0c759fc9 511 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
512 /*
513 * Either the session daemon is not responding or the relayd died so we
514 * stop now.
515 */
516 goto error;
517 }
518
519 /* Send channel to sessiond. */
b623cb6a 520 ret = lttng_ust_ctl_send_channel_to_sessiond(sock, channel->uchan);
f2a444f1
DG
521 if (ret < 0) {
522 goto error;
523 }
524
b623cb6a 525 ret = lttng_ust_ctl_channel_close_wakeup_fd(channel->uchan);
f2a444f1
DG
526 if (ret < 0) {
527 goto error;
528 }
529
530 /* The channel was sent successfully to the sessiond at this point. */
28ab034a 531 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
9ce5646a
MD
532 health_code_update();
533
ffe60014
DG
534 /* Send stream to session daemon. */
535 ret = send_sessiond_stream(sock, stream);
536 if (ret < 0) {
537 goto error;
538 }
539 }
540
541 /* Tell sessiond there is no more stream. */
b623cb6a 542 ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL);
ffe60014
DG
543 if (ret < 0) {
544 goto error;
545 }
546
547 DBG("UST consumer NULL stream sent to sessiond");
548
549 return 0;
550
551error:
0c759fc9 552 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
f2a444f1
DG
553 ret = -1;
554 }
ffe60014
DG
555 return ret;
556}
557
558/*
559 * Creates a channel and streams and add the channel it to the channel internal
560 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
561 * received.
562 *
563 * Return 0 on success or else, a negative value is returned and the channel
564 * MUST be destroyed by consumer_del_channel().
565 */
75cfe9e6 566static int ask_channel(struct lttng_consumer_local_data *ctx,
28ab034a
JG
567 struct lttng_consumer_channel *channel,
568 struct lttng_ust_ctl_consumer_channel_attr *attr)
3bd1e081
MD
569{
570 int ret;
571
a0377dfe
FD
572 LTTNG_ASSERT(ctx);
573 LTTNG_ASSERT(channel);
574 LTTNG_ASSERT(attr);
ffe60014
DG
575
576 /*
577 * This value is still used by the kernel consumer since for the kernel,
578 * the stream ownership is not IN the consumer so we need to have the
579 * number of left stream that needs to be initialized so we can know when
580 * to delete the channel (see consumer.c).
581 *
582 * As for the user space tracer now, the consumer creates and sends the
583 * stream to the session daemon which only sends them to the application
584 * once every stream of a channel is received making this value useless
585 * because we they will be added to the poll thread before the application
586 * receives them. This ensures that a stream can not hang up during
587 * initilization of a channel.
588 */
589 channel->nb_init_stream_left = 0;
590
591 /* The reply msg status is handled in the following call. */
4628484a 592 ret = create_ust_channel(channel, attr, &channel->uchan);
ffe60014 593 if (ret < 0) {
10a50311 594 goto end;
3bd1e081
MD
595 }
596
b623cb6a 597 channel->wait_fd = lttng_ust_ctl_channel_get_wait_fd(channel->uchan);
d8ef542d 598
10a50311
JD
599 /*
600 * For the snapshots (no monitor), we create the metadata streams
601 * on demand, not during the channel creation.
602 */
603 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
604 ret = 0;
605 goto end;
606 }
607
ffe60014 608 /* Open all streams for this channel. */
d2956687
JG
609 pthread_mutex_lock(&channel->lock);
610 ret = create_ust_streams(channel, ctx);
611 pthread_mutex_unlock(&channel->lock);
ffe60014 612 if (ret < 0) {
10a50311 613 goto end;
ffe60014
DG
614 }
615
10a50311 616end:
3bd1e081
MD
617 return ret;
618}
619
d88aee68
DG
620/*
621 * Send all stream of a channel to the right thread handling it.
622 *
623 * On error, return a negative value else 0 on success.
624 */
625static int send_streams_to_thread(struct lttng_consumer_channel *channel,
28ab034a 626 struct lttng_consumer_local_data *ctx)
d88aee68
DG
627{
628 int ret = 0;
629 struct lttng_consumer_stream *stream, *stmp;
630
a0377dfe
FD
631 LTTNG_ASSERT(channel);
632 LTTNG_ASSERT(ctx);
d88aee68
DG
633
634 /* Send streams to the corresponding thread. */
28ab034a 635 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
9ce5646a
MD
636 health_code_update();
637
d88aee68
DG
638 /* Sending the stream to the thread. */
639 ret = send_stream_to_thread(stream, ctx);
640 if (ret < 0) {
641 /*
642 * If we are unable to send the stream to the thread, there is
643 * a big problem so just stop everything.
644 */
645 goto error;
646 }
d88aee68
DG
647 }
648
649error:
650 return ret;
651}
652
7972aab2
DG
653/*
654 * Flush channel's streams using the given key to retrieve the channel.
655 *
656 * Return 0 on success else an LTTng error code.
657 */
658static int flush_channel(uint64_t chan_key)
659{
660 int ret = 0;
661 struct lttng_consumer_channel *channel;
662 struct lttng_consumer_stream *stream;
663 struct lttng_ht *ht;
664 struct lttng_ht_iter iter;
665
8fd623e0 666 DBG("UST consumer flush channel key %" PRIu64, chan_key);
7972aab2 667
a500c257 668 rcu_read_lock();
7972aab2
DG
669 channel = consumer_find_channel(chan_key);
670 if (!channel) {
8fd623e0 671 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
7972aab2
DG
672 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
673 goto error;
674 }
675
fa29bfbf 676 ht = the_consumer_data.stream_per_chan_id_ht;
7972aab2
DG
677
678 /* For each stream of the channel id, flush it. */
7972aab2 679 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
680 ht->hash_fct(&channel->key, lttng_ht_seed),
681 ht->match_fct,
682 &channel->key,
683 &iter.iter,
684 stream,
685 node_channel_id.node)
686 {
9ce5646a
MD
687 health_code_update();
688
0dd01979 689 pthread_mutex_lock(&stream->lock);
5cfcab67
JR
690
691 /*
692 * Protect against concurrent teardown of a stream.
693 */
694 if (cds_lfht_is_node_deleted(&stream->node.node)) {
695 goto next;
696 }
697
0dd01979 698 if (!stream->quiescent) {
881fc67f
MD
699 ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
700 if (ret) {
28ab034a
JG
701 ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64
702 ", channel name = '%s'",
703 chan_key,
704 channel->name);
881fc67f
MD
705 ret = LTTNG_ERR_BUFFER_FLUSH_FAILED;
706 pthread_mutex_unlock(&stream->lock);
707 goto error;
708 }
0dd01979
MD
709 stream->quiescent = true;
710 }
28ab034a 711 next:
0dd01979
MD
712 pthread_mutex_unlock(&stream->lock);
713 }
9cc4ae91
JG
714
715 /*
716 * Send one last buffer statistics update to the session daemon. This
717 * ensures that the session daemon gets at least one statistics update
718 * per channel even in the case of short-lived channels, such as when a
719 * short-lived app is traced in per-pid mode.
720 */
721 sample_and_send_channel_buffer_stats(channel);
0dd01979
MD
722error:
723 rcu_read_unlock();
724 return ret;
725}
726
727/*
728 * Clear quiescent state from channel's streams using the given key to
729 * retrieve the channel.
730 *
731 * Return 0 on success else an LTTng error code.
732 */
733static int clear_quiescent_channel(uint64_t chan_key)
734{
735 int ret = 0;
736 struct lttng_consumer_channel *channel;
737 struct lttng_consumer_stream *stream;
738 struct lttng_ht *ht;
739 struct lttng_ht_iter iter;
740
741 DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
742
743 rcu_read_lock();
744 channel = consumer_find_channel(chan_key);
745 if (!channel) {
746 ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
747 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
748 goto error;
749 }
750
fa29bfbf 751 ht = the_consumer_data.stream_per_chan_id_ht;
0dd01979
MD
752
753 /* For each stream of the channel id, clear quiescent state. */
754 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
755 ht->hash_fct(&channel->key, lttng_ht_seed),
756 ht->match_fct,
757 &channel->key,
758 &iter.iter,
759 stream,
760 node_channel_id.node)
761 {
0dd01979
MD
762 health_code_update();
763
764 pthread_mutex_lock(&stream->lock);
765 stream->quiescent = false;
766 pthread_mutex_unlock(&stream->lock);
7972aab2 767 }
7972aab2 768error:
a500c257 769 rcu_read_unlock();
7972aab2
DG
770 return ret;
771}
772
d88aee68
DG
773/*
774 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
775 *
776 * Return 0 on success else an LTTng error code.
777 */
778static int close_metadata(uint64_t chan_key)
779{
ea88ca2a 780 int ret = 0;
d88aee68 781 struct lttng_consumer_channel *channel;
f65a74be 782 unsigned int channel_monitor;
d88aee68 783
8fd623e0 784 DBG("UST consumer close metadata key %" PRIu64, chan_key);
d88aee68
DG
785
786 channel = consumer_find_channel(chan_key);
787 if (!channel) {
84cc9aa0
DG
788 /*
789 * This is possible if the metadata thread has issue a delete because
790 * the endpoint point of the stream hung up. There is no way the
791 * session daemon can know about it thus use a DBG instead of an actual
792 * error.
793 */
794 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
d88aee68
DG
795 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
796 goto error;
797 }
798
fa29bfbf 799 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 800 pthread_mutex_lock(&channel->lock);
f65a74be 801 channel_monitor = channel->monitor;
73811ecc
DG
802 if (cds_lfht_is_node_deleted(&channel->node.node)) {
803 goto error_unlock;
804 }
805
6d574024 806 lttng_ustconsumer_close_metadata(channel);
f65a74be 807 pthread_mutex_unlock(&channel->lock);
fa29bfbf 808 pthread_mutex_unlock(&the_consumer_data.lock);
d88aee68 809
f65a74be
JG
810 /*
811 * The ownership of a metadata channel depends on the type of
812 * session to which it belongs. In effect, the monitor flag is checked
813 * to determine if this metadata channel is in "snapshot" mode or not.
814 *
815 * In the non-snapshot case, the metadata channel is created along with
816 * a single stream which will remain present until the metadata channel
817 * is destroyed (on the destruction of its session). In this case, the
818 * metadata stream in "monitored" by the metadata poll thread and holds
819 * the ownership of its channel.
820 *
821 * Closing the metadata will cause the metadata stream's "metadata poll
822 * pipe" to be closed. Closing this pipe will wake-up the metadata poll
823 * thread which will teardown the metadata stream which, in return,
824 * deletes the metadata channel.
825 *
826 * In the snapshot case, the metadata stream is created and destroyed
827 * on every snapshot record. Since the channel doesn't have an owner
828 * other than the session daemon, it is safe to destroy it immediately
829 * on reception of the CLOSE_METADATA command.
830 */
831 if (!channel_monitor) {
832 /*
833 * The channel and consumer_data locks must be
834 * released before this call since consumer_del_channel
835 * re-acquires the channel and consumer_data locks to teardown
836 * the channel and queue its reclamation by the "call_rcu"
837 * worker thread.
838 */
839 consumer_del_channel(channel);
840 }
841
842 return ret;
ea88ca2a 843error_unlock:
a9838785 844 pthread_mutex_unlock(&channel->lock);
fa29bfbf 845 pthread_mutex_unlock(&the_consumer_data.lock);
d88aee68
DG
846error:
847 return ret;
848}
849
850/*
851 * RCU read side lock MUST be acquired before calling this function.
852 *
853 * Return 0 on success else an LTTng error code.
854 */
855static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
856{
857 int ret;
858 struct lttng_consumer_channel *metadata;
859
48b7cdc2
FD
860 ASSERT_RCU_READ_LOCKED();
861
8fd623e0 862 DBG("UST consumer setup metadata key %" PRIu64, key);
d88aee68
DG
863
864 metadata = consumer_find_channel(key);
865 if (!metadata) {
866 ERR("UST consumer push metadata %" PRIu64 " not found", key);
867 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
10a50311
JD
868 goto end;
869 }
870
871 /*
872 * In no monitor mode, the metadata channel has no stream(s) so skip the
873 * ownership transfer to the metadata thread.
874 */
875 if (!metadata->monitor) {
876 DBG("Metadata channel in no monitor");
877 ret = 0;
878 goto end;
d88aee68
DG
879 }
880
881 /*
882 * Send metadata stream to relayd if one available. Availability is
883 * known if the stream is still in the list of the channel.
884 */
885 if (cds_list_empty(&metadata->streams.head)) {
886 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
887 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
f5a0c9cf 888 goto error_no_stream;
d88aee68
DG
889 }
890
891 /* Send metadata stream to relayd if needed. */
62285ea4 892 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
28ab034a 893 ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname);
62285ea4
DG
894 if (ret < 0) {
895 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
896 goto error;
897 }
28ab034a 898 ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx);
601262d6
JD
899 if (ret < 0) {
900 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
901 goto error;
902 }
d88aee68
DG
903 }
904
a8086cf4
JR
905 /*
906 * Ownership of metadata stream is passed along. Freeing is handled by
907 * the callee.
908 */
d88aee68
DG
909 ret = send_streams_to_thread(metadata, ctx);
910 if (ret < 0) {
911 /*
912 * If we are unable to send the stream to the thread, there is
913 * a big problem so just stop everything.
914 */
915 ret = LTTCOMM_CONSUMERD_FATAL;
a8086cf4 916 goto send_streams_error;
d88aee68
DG
917 }
918 /* List MUST be empty after or else it could be reused. */
a0377dfe 919 LTTNG_ASSERT(cds_list_empty(&metadata->streams.head));
d88aee68 920
10a50311
JD
921 ret = 0;
922 goto end;
d88aee68
DG
923
924error:
f2a444f1
DG
925 /*
926 * Delete metadata channel on error. At this point, the metadata stream can
927 * NOT be monitored by the metadata thread thus having the guarantee that
928 * the stream is still in the local stream list of the channel. This call
929 * will make sure to clean that list.
930 */
f5a0c9cf 931 consumer_stream_destroy(metadata->metadata_stream, NULL);
212d67a2 932 metadata->metadata_stream = NULL;
a8086cf4 933send_streams_error:
f5a0c9cf 934error_no_stream:
10a50311
JD
935end:
936 return ret;
937}
938
939/*
940 * Snapshot the whole metadata.
d2956687 941 * RCU read-side lock must be held by the caller.
10a50311
JD
942 *
943 * Returns 0 on success, < 0 on error
944 */
3eb928aa 945static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
28ab034a
JG
946 uint64_t key,
947 char *path,
948 uint64_t relayd_id,
949 struct lttng_consumer_local_data *ctx)
10a50311
JD
950{
951 int ret = 0;
10a50311
JD
952 struct lttng_consumer_stream *metadata_stream;
953
a0377dfe
FD
954 LTTNG_ASSERT(path);
955 LTTNG_ASSERT(ctx);
48b7cdc2 956 ASSERT_RCU_READ_LOCKED();
10a50311 957
28ab034a 958 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
10a50311
JD
959
960 rcu_read_lock();
961
a0377dfe 962 LTTNG_ASSERT(!metadata_channel->monitor);
10a50311 963
9ce5646a
MD
964 health_code_update();
965
10a50311
JD
966 /*
967 * Ask the sessiond if we have new metadata waiting and update the
968 * consumer metadata cache.
969 */
94d49140 970 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
10a50311
JD
971 if (ret < 0) {
972 goto error;
973 }
974
9ce5646a
MD
975 health_code_update();
976
10a50311
JD
977 /*
978 * The metadata stream is NOT created in no monitor mode when the channel
979 * is created on a sessiond ask channel command.
980 */
d2956687 981 ret = create_ust_streams(metadata_channel, ctx);
10a50311
JD
982 if (ret < 0) {
983 goto error;
984 }
985
986 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 987 LTTNG_ASSERT(metadata_stream);
10a50311 988
947bd097 989 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
10a50311
JD
990 if (relayd_id != (uint64_t) -1ULL) {
991 metadata_stream->net_seq_idx = relayd_id;
992 ret = consumer_send_relayd_stream(metadata_stream, path);
10a50311 993 } else {
28ab034a 994 ret = consumer_stream_create_output_files(metadata_stream, false);
d2956687 995 }
d2956687
JG
996 if (ret < 0) {
997 goto error_stream;
10a50311
JD
998 }
999
04ef1097 1000 do {
9ce5646a 1001 health_code_update();
6f9449c2 1002 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
10a50311 1003 if (ret < 0) {
94d49140 1004 goto error_stream;
10a50311 1005 }
04ef1097 1006 } while (ret > 0);
10a50311 1007
10a50311 1008error_stream:
947bd097 1009 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
10a50311 1010 /*
947bd097
JR
1011 * Clean up the stream completely because the next snapshot will use a
1012 * new metadata stream.
10a50311 1013 */
10a50311
JD
1014 consumer_stream_destroy(metadata_stream, NULL);
1015 metadata_channel->metadata_stream = NULL;
1016
1017error:
1018 rcu_read_unlock();
1019 return ret;
1020}
1021
28ab034a 1022static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
128708c3
JG
1023{
1024 int ret;
1025 unsigned long mmap_offset;
1026 const char *mmap_base;
1027
97535efa 1028 mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream);
128708c3 1029 if (!mmap_base) {
28ab034a 1030 ERR("Failed to get mmap base for stream `%s`", stream->name);
128708c3
JG
1031 ret = -EPERM;
1032 goto error;
1033 }
1034
b623cb6a 1035 ret = lttng_ust_ctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
128708c3
JG
1036 if (ret != 0) {
1037 ERR("Failed to get mmap offset for stream `%s`", stream->name);
1038 ret = -EINVAL;
1039 goto error;
1040 }
1041
1042 *addr = mmap_base + mmap_offset;
1043error:
1044 return ret;
128708c3
JG
1045}
1046
10a50311
JD
1047/*
1048 * Take a snapshot of all the stream of a channel.
d2956687 1049 * RCU read-side lock and the channel lock must be held by the caller.
10a50311
JD
1050 *
1051 * Returns 0 on success, < 0 on error
1052 */
3eb928aa 1053static int snapshot_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
1054 uint64_t key,
1055 char *path,
1056 uint64_t relayd_id,
1057 uint64_t nb_packets_per_stream,
1058 struct lttng_consumer_local_data *ctx)
10a50311
JD
1059{
1060 int ret;
1061 unsigned use_relayd = 0;
1062 unsigned long consumed_pos, produced_pos;
10a50311
JD
1063 struct lttng_consumer_stream *stream;
1064
a0377dfe
FD
1065 LTTNG_ASSERT(path);
1066 LTTNG_ASSERT(ctx);
48b7cdc2 1067 ASSERT_RCU_READ_LOCKED();
10a50311
JD
1068
1069 rcu_read_lock();
1070
1071 if (relayd_id != (uint64_t) -1ULL) {
1072 use_relayd = 1;
1073 }
1074
a0377dfe 1075 LTTNG_ASSERT(!channel->monitor);
6a00837f 1076 DBG("UST consumer snapshot channel %" PRIu64, key);
10a50311 1077
28ab034a 1078 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
9ce5646a
MD
1079 health_code_update();
1080
10a50311
JD
1081 /* Lock stream because we are about to change its state. */
1082 pthread_mutex_lock(&stream->lock);
a0377dfe 1083 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
1084 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
1085 /*
1086 * Can't happen barring an internal error as the channel
1087 * holds a reference to the trace chunk.
1088 */
1089 ERR("Failed to acquire reference to channel's trace chunk");
1090 ret = -1;
1091 goto error_unlock;
1092 }
a0377dfe 1093 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
1094 stream->trace_chunk = channel->trace_chunk;
1095
10a50311
JD
1096 stream->net_seq_idx = relayd_id;
1097
1098 if (use_relayd) {
1099 ret = consumer_send_relayd_stream(stream, path);
1100 if (ret < 0) {
a4beac76 1101 goto error_close_stream;
10a50311
JD
1102 }
1103 } else {
28ab034a 1104 ret = consumer_stream_create_output_files(stream, false);
10a50311 1105 if (ret < 0) {
a4beac76 1106 goto error_close_stream;
10a50311 1107 }
28ab034a 1108 DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
10a50311
JD
1109 }
1110
d4d80f77
MD
1111 /*
1112 * If tracing is active, we want to perform a "full" buffer flush.
1113 * Else, if quiescent, it has already been done by the prior stop.
1114 */
1115 if (!stream->quiescent) {
881fc67f
MD
1116 ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
1117 if (ret < 0) {
28ab034a
JG
1118 ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64
1119 ", channel name = '%s'",
1120 channel->key,
1121 channel->name);
881fc67f
MD
1122 goto error_unlock;
1123 }
d4d80f77 1124 }
10a50311
JD
1125
1126 ret = lttng_ustconsumer_take_snapshot(stream);
1127 if (ret < 0) {
1128 ERR("Taking UST snapshot");
a4beac76 1129 goto error_close_stream;
10a50311
JD
1130 }
1131
1132 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
1133 if (ret < 0) {
1134 ERR("Produced UST snapshot position");
a4beac76 1135 goto error_close_stream;
10a50311
JD
1136 }
1137
1138 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
1139 if (ret < 0) {
1140 ERR("Consumerd UST snapshot position");
a4beac76 1141 goto error_close_stream;
10a50311
JD
1142 }
1143
5c786ded
JD
1144 /*
1145 * The original value is sent back if max stream size is larger than
d07ceecd 1146 * the possible size of the snapshot. Also, we assume that the session
5c786ded
JD
1147 * daemon should never send a maximum stream size that is lower than
1148 * subbuffer size.
1149 */
28ab034a
JG
1150 consumed_pos = consumer_get_consume_start_pos(
1151 consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
5c786ded 1152
9377d830 1153 while ((long) (consumed_pos - produced_pos) < 0) {
10a50311
JD
1154 ssize_t read_len;
1155 unsigned long len, padded_len;
128708c3 1156 const char *subbuf_addr;
fd424d99 1157 struct lttng_buffer_view subbuf_view;
10a50311 1158
9ce5646a
MD
1159 health_code_update();
1160
10a50311
JD
1161 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
1162
b623cb6a 1163 ret = lttng_ust_ctl_get_subbuf(stream->ustream, &consumed_pos);
10a50311
JD
1164 if (ret < 0) {
1165 if (ret != -EAGAIN) {
b623cb6a 1166 PERROR("lttng_ust_ctl_get_subbuf snapshot");
10a50311
JD
1167 goto error_close_stream;
1168 }
1169 DBG("UST consumer get subbuf failed. Skipping it.");
1170 consumed_pos += stream->max_sb_size;
ddc93ee4 1171 stream->chan->lost_packets++;
10a50311
JD
1172 continue;
1173 }
1174
b623cb6a 1175 ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
10a50311 1176 if (ret < 0) {
b623cb6a 1177 ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
10a50311
JD
1178 goto error_put_subbuf;
1179 }
1180
b623cb6a 1181 ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
10a50311 1182 if (ret < 0) {
b623cb6a 1183 ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
10a50311
JD
1184 goto error_put_subbuf;
1185 }
1186
128708c3
JG
1187 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1188 if (ret) {
1189 goto error_put_subbuf;
1190 }
1191
28ab034a 1192 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
f5ba75b4 1193 read_len = lttng_consumer_on_read_subbuffer_mmap(
28ab034a 1194 stream, &subbuf_view, padded_len - len);
10a50311
JD
1195 if (use_relayd) {
1196 if (read_len != len) {
56591bac 1197 ret = -EPERM;
10a50311
JD
1198 goto error_put_subbuf;
1199 }
1200 } else {
1201 if (read_len != padded_len) {
56591bac 1202 ret = -EPERM;
10a50311
JD
1203 goto error_put_subbuf;
1204 }
1205 }
1206
b623cb6a 1207 ret = lttng_ust_ctl_put_subbuf(stream->ustream);
10a50311 1208 if (ret < 0) {
b623cb6a 1209 ERR("Snapshot lttng_ust_ctl_put_subbuf");
10a50311
JD
1210 goto error_close_stream;
1211 }
1212 consumed_pos += stream->max_sb_size;
1213 }
1214
1215 /* Simply close the stream so we can use it on the next snapshot. */
d119bd01 1216 consumer_stream_close_output(stream);
10a50311
JD
1217 pthread_mutex_unlock(&stream->lock);
1218 }
1219
1220 rcu_read_unlock();
1221 return 0;
1222
1223error_put_subbuf:
b623cb6a
MJ
1224 if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
1225 ERR("Snapshot lttng_ust_ctl_put_subbuf");
10a50311
JD
1226 }
1227error_close_stream:
d119bd01 1228 consumer_stream_close_output(stream);
10a50311
JD
1229error_unlock:
1230 pthread_mutex_unlock(&stream->lock);
10a50311 1231 rcu_read_unlock();
d88aee68
DG
1232 return ret;
1233}
1234
28ab034a 1235static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
b1316da1
JG
1236{
1237 ASSERT_LOCKED(stream->lock);
1238
28ab034a 1239 DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id);
b1316da1
JG
1240 stream->ust_metadata_pushed = 0;
1241}
1242
331744e3 1243/*
c585821b
MD
1244 * Receive the metadata updates from the sessiond. Supports receiving
1245 * overlapping metadata, but is needs to always belong to a contiguous
1246 * range starting from 0.
1247 * Be careful about the locks held when calling this function: it needs
1248 * the metadata cache flush to concurrently progress in order to
1249 * complete.
331744e3 1250 */
28ab034a
JG
1251int lttng_ustconsumer_recv_metadata(int sock,
1252 uint64_t key,
1253 uint64_t offset,
1254 uint64_t len,
1255 uint64_t version,
1256 struct lttng_consumer_channel *channel,
1257 int timer,
1258 int wait)
331744e3 1259{
0c759fc9 1260 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
331744e3 1261 char *metadata_str;
b1316da1 1262 enum consumer_metadata_cache_write_status cache_write_status;
331744e3 1263
8fd623e0 1264 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
331744e3 1265
64803277 1266 metadata_str = calloc<char>(len);
331744e3
JD
1267 if (!metadata_str) {
1268 PERROR("zmalloc metadata string");
1269 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1270 goto end;
1271 }
1272
9ce5646a
MD
1273 health_code_update();
1274
331744e3
JD
1275 /* Receive metadata string. */
1276 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1277 if (ret < 0) {
1278 /* Session daemon is dead so return gracefully. */
1279 ret_code = ret;
1280 goto end_free;
1281 }
1282
9ce5646a
MD
1283 health_code_update();
1284
331744e3 1285 pthread_mutex_lock(&channel->metadata_cache->lock);
b1316da1 1286 cache_write_status = consumer_metadata_cache_write(
28ab034a 1287 channel->metadata_cache, offset, len, version, metadata_str);
3bdc49f3 1288 pthread_mutex_unlock(&channel->metadata_cache->lock);
b1316da1
JG
1289 switch (cache_write_status) {
1290 case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
1291 /*
1292 * The write entirely overlapped with existing contents of the
1293 * same metadata version (same content); there is nothing to do.
1294 */
1295 break;
1296 case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
1297 /*
1298 * The metadata cache was invalidated (previously pushed
1299 * content has been overwritten). Reset the stream's consumed
1300 * metadata position to ensure the metadata poll thread consumes
1301 * the whole cache.
1302 */
947bd097
JR
1303
1304 /*
1305 * channel::metadata_stream can be null when the metadata
1306 * channel is under a snapshot session type. No need to update
1307 * the stream position in that scenario.
1308 */
1309 if (channel->metadata_stream != NULL) {
1310 pthread_mutex_lock(&channel->metadata_stream->lock);
28ab034a 1311 metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
947bd097
JR
1312 pthread_mutex_unlock(&channel->metadata_stream->lock);
1313 } else {
1314 /* Validate we are in snapshot mode. */
1315 LTTNG_ASSERT(!channel->monitor);
1316 }
b1316da1
JG
1317 /* Fall-through. */
1318 case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
1319 /*
1320 * In both cases, the metadata poll thread has new data to
1321 * consume.
1322 */
1323 ret = consumer_metadata_wakeup_pipe(channel);
1324 if (ret) {
1325 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1326 goto end_free;
1327 }
1328 break;
1329 case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
331744e3
JD
1330 /* Unable to handle metadata. Notify session daemon. */
1331 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
a32bd775
DG
1332 /*
1333 * Skip metadata flush on write error since the offset and len might
1334 * not have been updated which could create an infinite loop below when
1335 * waiting for the metadata cache to be flushed.
1336 */
a32bd775 1337 goto end_free;
b1316da1
JG
1338 default:
1339 abort();
331744e3 1340 }
331744e3 1341
94d49140
JD
1342 if (!wait) {
1343 goto end_free;
1344 }
5e41ebe1 1345 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
331744e3 1346 DBG("Waiting for metadata to be flushed");
9ce5646a
MD
1347
1348 health_code_update();
1349
331744e3
JD
1350 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1351 }
1352
1353end_free:
1354 free(metadata_str);
1355end:
1356 return ret_code;
1357}
1358
4cbc1a04
DG
1359/*
1360 * Receive command from session daemon and process it.
1361 *
1362 * Return 1 on success else a negative value or 0.
1363 */
3bd1e081 1364int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1365 int sock,
1366 struct pollfd *consumer_sockpoll)
3bd1e081 1367{
594c7c00 1368 int ret_func;
0c759fc9 1369 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081 1370 struct lttcomm_consumer_msg msg;
ffe60014 1371 struct lttng_consumer_channel *channel = NULL;
3bd1e081 1372
9ce5646a
MD
1373 health_code_update();
1374
594c7c00
SM
1375 {
1376 ssize_t ret_recv;
1377
1378 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1379 if (ret_recv != sizeof(msg)) {
1380 DBG("Consumer received unexpected message size %zd (expects %zu)",
28ab034a
JG
1381 ret_recv,
1382 sizeof(msg));
594c7c00
SM
1383 /*
1384 * The ret value might 0 meaning an orderly shutdown but this is ok
1385 * since the caller handles this.
1386 */
1387 if (ret_recv > 0) {
28ab034a 1388 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
594c7c00
SM
1389 ret_recv = -1;
1390 }
1391 return ret_recv;
489f70e9 1392 }
3bd1e081 1393 }
9ce5646a
MD
1394
1395 health_code_update();
1396
84382d49 1397 /* deprecated */
a0377dfe 1398 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 1399
9ce5646a
MD
1400 health_code_update();
1401
3f8e211f 1402 /* relayd needs RCU read-side lock */
b0b335c8
MD
1403 rcu_read_lock();
1404
3bd1e081 1405 switch (msg.cmd_type) {
00e2e675
DG
1406 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1407 {
4222116f
JR
1408 uint32_t major = msg.u.relayd_sock.major;
1409 uint32_t minor = msg.u.relayd_sock.minor;
1410 enum lttcomm_sock_proto protocol =
28ab034a 1411 (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
4222116f 1412
f50f23d9 1413 /* Session daemon status message are handled in the following call. */
2527bf85 1414 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
28ab034a
JG
1415 msg.u.relayd_sock.type,
1416 ctx,
1417 sock,
1418 consumer_sockpoll,
1419 msg.u.relayd_sock.session_id,
1420 msg.u.relayd_sock.relayd_session_id,
1421 major,
1422 minor,
1423 protocol);
00e2e675
DG
1424 goto end_nosignal;
1425 }
173af62f
DG
1426 case LTTNG_CONSUMER_DESTROY_RELAYD:
1427 {
a6ba4fe1 1428 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
1429 struct consumer_relayd_sock_pair *relayd;
1430
a6ba4fe1 1431 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
1432
1433 /* Get relayd reference if exists. */
a6ba4fe1 1434 relayd = consumer_find_relayd(index);
173af62f 1435 if (relayd == NULL) {
3448e266 1436 DBG("Unable to find relayd %" PRIu64, index);
e462382a 1437 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
173af62f
DG
1438 }
1439
a6ba4fe1
DG
1440 /*
1441 * Each relayd socket pair has a refcount of stream attached to it
1442 * which tells if the relayd is still active or not depending on the
1443 * refcount value.
1444 *
1445 * This will set the destroy flag of the relayd object and destroy it
1446 * if the refcount reaches zero when called.
1447 *
1448 * The destroy can happen either here or when a stream fd hangs up.
1449 */
f50f23d9
DG
1450 if (relayd) {
1451 consumer_flag_relayd_for_destroy(relayd);
1452 }
1453
d88aee68 1454 goto end_msg_sessiond;
173af62f 1455 }
3bd1e081
MD
1456 case LTTNG_CONSUMER_UPDATE_STREAM:
1457 {
3f8e211f 1458 rcu_read_unlock();
7ad0a0cb 1459 return -ENOSYS;
3bd1e081 1460 }
6d805429 1461 case LTTNG_CONSUMER_DATA_PENDING:
53632229 1462 {
594c7c00
SM
1463 int is_data_pending;
1464 ssize_t ret_send;
6d805429 1465 uint64_t id = msg.u.data_pending.session_id;
ca22feea 1466
6d805429 1467 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 1468
3be74084 1469 is_data_pending = consumer_data_pending(id);
ca22feea
DG
1470
1471 /* Send back returned value to session daemon */
28ab034a 1472 ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending));
594c7c00 1473 if (ret_send < 0) {
28ab034a 1474 DBG("Error when sending the data pending ret code: %zd", ret_send);
489f70e9 1475 goto error_fatal;
ca22feea 1476 }
f50f23d9
DG
1477
1478 /*
1479 * No need to send back a status message since the data pending
1480 * returned value is the response.
1481 */
ca22feea 1482 break;
53632229 1483 }
ffe60014
DG
1484 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1485 {
594c7c00 1486 int ret_ask_channel, ret_add_channel, ret_send;
b623cb6a 1487 struct lttng_ust_ctl_consumer_channel_attr attr;
d2956687
JG
1488 const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value;
1489 const struct lttng_credentials buffer_credentials = {
ff588497
JR
1490 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.uid),
1491 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.gid),
d2956687 1492 };
ffe60014
DG
1493
1494 /* Create a plain object and reserve a channel key. */
a2814ea7 1495 channel = consumer_allocate_channel(
28ab034a
JG
1496 msg.u.ask_channel.key,
1497 msg.u.ask_channel.session_id,
1498 msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
1499 msg.u.ask_channel.pathname,
1500 msg.u.ask_channel.name,
1501 msg.u.ask_channel.relayd_id,
1502 (enum lttng_event_output) msg.u.ask_channel.output,
1503 msg.u.ask_channel.tracefile_size,
1504 msg.u.ask_channel.tracefile_count,
1505 msg.u.ask_channel.session_id_per_pid,
1506 msg.u.ask_channel.monitor,
1507 msg.u.ask_channel.live_timer_interval,
1508 msg.u.ask_channel.is_live,
1509 msg.u.ask_channel.root_shm_path,
1510 msg.u.ask_channel.shm_path);
ffe60014
DG
1511 if (!channel) {
1512 goto end_channel_error;
1513 }
1514
28ab034a 1515 LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials);
d2956687 1516
567eb353
DG
1517 /*
1518 * Assign UST application UID to the channel. This value is ignored for
1519 * per PID buffers. This is specific to UST thus setting this after the
1520 * allocation.
1521 */
1522 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1523
ffe60014
DG
1524 /* Build channel attributes from received message. */
1525 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1526 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1527 attr.overwrite = msg.u.ask_channel.overwrite;
1528 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1529 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
7972aab2 1530 attr.chan_id = msg.u.ask_channel.chan_id;
ffe60014 1531 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
28ab034a 1532 attr.blocking_timeout = msg.u.ask_channel.blocking_timeout;
ffe60014 1533
0c759fc9
DG
1534 /* Match channel buffer type to the UST abi. */
1535 switch (msg.u.ask_channel.output) {
1536 case LTTNG_EVENT_MMAP:
1537 default:
fc4b93fa 1538 attr.output = LTTNG_UST_ABI_MMAP;
0c759fc9
DG
1539 break;
1540 }
1541
ffe60014
DG
1542 /* Translate and save channel type. */
1543 switch (msg.u.ask_channel.type) {
fc4b93fa 1544 case LTTNG_UST_ABI_CHAN_PER_CPU:
ffe60014 1545 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
fc4b93fa 1546 attr.type = LTTNG_UST_ABI_CHAN_PER_CPU;
8633d6e3
MD
1547 /*
1548 * Set refcount to 1 for owner. Below, we will
1549 * pass ownership to the
1550 * consumer_thread_channel_poll() thread.
1551 */
1552 channel->refcount = 1;
ffe60014 1553 break;
fc4b93fa 1554 case LTTNG_UST_ABI_CHAN_METADATA:
ffe60014 1555 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
fc4b93fa 1556 attr.type = LTTNG_UST_ABI_CHAN_METADATA;
ffe60014
DG
1557 break;
1558 default:
a0377dfe 1559 abort();
ffe60014
DG
1560 goto error_fatal;
1561 };
1562
9ce5646a
MD
1563 health_code_update();
1564
594c7c00
SM
1565 ret_ask_channel = ask_channel(ctx, channel, &attr);
1566 if (ret_ask_channel < 0) {
ffe60014
DG
1567 goto end_channel_error;
1568 }
1569
fc4b93fa 1570 if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
594c7c00
SM
1571 int ret_allocate;
1572
28ab034a 1573 ret_allocate = consumer_metadata_cache_allocate(channel);
594c7c00 1574 if (ret_allocate < 0) {
fc643247
MD
1575 ERR("Allocating metadata cache");
1576 goto end_channel_error;
1577 }
1578 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1579 attr.switch_timer_interval = 0;
94d49140 1580 } else {
e9404c27
JG
1581 int monitor_start_ret;
1582
28ab034a 1583 consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
e9404c27 1584 monitor_start_ret = consumer_timer_monitor_start(
28ab034a 1585 channel, msg.u.ask_channel.monitor_timer_interval);
e9404c27
JG
1586 if (monitor_start_ret < 0) {
1587 ERR("Starting channel monitoring timer failed");
1588 goto end_channel_error;
1589 }
fc643247
MD
1590 }
1591
9ce5646a
MD
1592 health_code_update();
1593
ffe60014
DG
1594 /*
1595 * Add the channel to the internal state AFTER all streams were created
1596 * and successfully sent to session daemon. This way, all streams must
1597 * be ready before this channel is visible to the threads.
fc643247
MD
1598 * If add_channel succeeds, ownership of the channel is
1599 * passed to consumer_thread_channel_poll().
ffe60014 1600 */
594c7c00
SM
1601 ret_add_channel = add_channel(channel, ctx);
1602 if (ret_add_channel < 0) {
fc4b93fa 1603 if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
ea88ca2a
MD
1604 if (channel->switch_timer_enabled == 1) {
1605 consumer_timer_switch_stop(channel);
1606 }
1607 consumer_metadata_cache_destroy(channel);
1608 }
d3e2ba59
JD
1609 if (channel->live_timer_enabled == 1) {
1610 consumer_timer_live_stop(channel);
1611 }
e9404c27
JG
1612 if (channel->monitor_timer_enabled == 1) {
1613 consumer_timer_monitor_stop(channel);
1614 }
ffe60014
DG
1615 goto end_channel_error;
1616 }
1617
9ce5646a
MD
1618 health_code_update();
1619
ffe60014
DG
1620 /*
1621 * Channel and streams are now created. Inform the session daemon that
1622 * everything went well and should wait to receive the channel and
1623 * streams with ustctl API.
1624 */
594c7c00
SM
1625 ret_send = consumer_send_status_channel(sock, channel);
1626 if (ret_send < 0) {
ffe60014 1627 /*
489f70e9 1628 * There is probably a problem on the socket.
ffe60014 1629 */
489f70e9 1630 goto error_fatal;
ffe60014
DG
1631 }
1632
1633 break;
1634 }
1635 case LTTNG_CONSUMER_GET_CHANNEL:
1636 {
1637 int ret, relayd_err = 0;
d88aee68 1638 uint64_t key = msg.u.get_channel.key;
594c7c00 1639 struct lttng_consumer_channel *found_channel;
ffe60014 1640
594c7c00
SM
1641 found_channel = consumer_find_channel(key);
1642 if (!found_channel) {
8fd623e0 1643 ERR("UST consumer get channel key %" PRIu64 " not found", key);
e462382a 1644 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
f3a1fc7b 1645 goto end_get_channel;
ffe60014
DG
1646 }
1647
9ce5646a
MD
1648 health_code_update();
1649
a3a86f35 1650 /* Send the channel to sessiond (and relayd, if applicable). */
28ab034a 1651 ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err);
ffe60014
DG
1652 if (ret < 0) {
1653 if (relayd_err) {
1654 /*
1655 * We were unable to send to the relayd the stream so avoid
1656 * sending back a fatal error to the thread since this is OK
f2a444f1
DG
1657 * and the consumer can continue its work. The above call
1658 * has sent the error status message to the sessiond.
ffe60014 1659 */
f3a1fc7b 1660 goto end_get_channel_nosignal;
ffe60014
DG
1661 }
1662 /*
1663 * The communicaton was broken hence there is a bad state between
1664 * the consumer and sessiond so stop everything.
1665 */
f3a1fc7b 1666 goto error_get_channel_fatal;
ffe60014
DG
1667 }
1668
9ce5646a
MD
1669 health_code_update();
1670
10a50311
JD
1671 /*
1672 * In no monitor mode, the streams ownership is kept inside the channel
1673 * so don't send them to the data thread.
1674 */
594c7c00 1675 if (!found_channel->monitor) {
f3a1fc7b 1676 goto end_get_channel;
10a50311
JD
1677 }
1678
594c7c00 1679 ret = send_streams_to_thread(found_channel, ctx);
d88aee68
DG
1680 if (ret < 0) {
1681 /*
1682 * If we are unable to send the stream to the thread, there is
1683 * a big problem so just stop everything.
1684 */
f3a1fc7b 1685 goto error_get_channel_fatal;
ffe60014 1686 }
ffe60014 1687 /* List MUST be empty after or else it could be reused. */
a0377dfe 1688 LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head));
28ab034a 1689 end_get_channel:
d88aee68 1690 goto end_msg_sessiond;
28ab034a 1691 error_get_channel_fatal:
f3a1fc7b 1692 goto error_fatal;
28ab034a 1693 end_get_channel_nosignal:
f3a1fc7b 1694 goto end_nosignal;
d88aee68
DG
1695 }
1696 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1697 {
1698 uint64_t key = msg.u.destroy_channel.key;
d88aee68 1699
a0cbdd2e
MD
1700 /*
1701 * Only called if streams have not been sent to stream
1702 * manager thread. However, channel has been sent to
1703 * channel manager thread.
1704 */
1705 notify_thread_del_channel(ctx, key);
d88aee68 1706 goto end_msg_sessiond;
ffe60014 1707 }
d88aee68
DG
1708 case LTTNG_CONSUMER_CLOSE_METADATA:
1709 {
1710 int ret;
1711
1712 ret = close_metadata(msg.u.close_metadata.key);
1713 if (ret != 0) {
97535efa 1714 ret_code = (lttcomm_return_code) ret;
d88aee68
DG
1715 }
1716
1717 goto end_msg_sessiond;
1718 }
7972aab2
DG
1719 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1720 {
1721 int ret;
1722
1723 ret = flush_channel(msg.u.flush_channel.key);
1724 if (ret != 0) {
97535efa 1725 ret_code = (lttcomm_return_code) ret;
7972aab2
DG
1726 }
1727
1728 goto end_msg_sessiond;
1729 }
0dd01979
MD
1730 case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
1731 {
1732 int ret;
1733
28ab034a 1734 ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
0dd01979 1735 if (ret != 0) {
97535efa 1736 ret_code = (lttcomm_return_code) ret;
0dd01979
MD
1737 }
1738
1739 goto end_msg_sessiond;
1740 }
d88aee68 1741 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
1742 {
1743 int ret;
d88aee68 1744 uint64_t len = msg.u.push_metadata.len;
d88aee68 1745 uint64_t key = msg.u.push_metadata.key;
331744e3 1746 uint64_t offset = msg.u.push_metadata.target_offset;
93ec662e 1747 uint64_t version = msg.u.push_metadata.version;
594c7c00 1748 struct lttng_consumer_channel *found_channel;
ffe60014 1749
28ab034a 1750 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
ffe60014 1751
594c7c00
SM
1752 found_channel = consumer_find_channel(key);
1753 if (!found_channel) {
000baf6a
DG
1754 /*
1755 * This is possible if the metadata creation on the consumer side
1756 * is in flight vis-a-vis a concurrent push metadata from the
1757 * session daemon. Simply return that the channel failed and the
1758 * session daemon will handle that message correctly considering
1759 * that this race is acceptable thus the DBG() statement here.
1760 */
1761 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1762 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
a8ffe244 1763 goto end_push_metadata_msg_sessiond;
d88aee68
DG
1764 }
1765
9ce5646a
MD
1766 health_code_update();
1767
c585821b
MD
1768 if (!len) {
1769 /*
1770 * There is nothing to receive. We have simply
1771 * checked whether the channel can be found.
1772 */
1773 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
a8ffe244 1774 goto end_push_metadata_msg_sessiond;
c585821b
MD
1775 }
1776
d88aee68 1777 /* Tell session daemon we are ready to receive the metadata. */
0c759fc9 1778 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
ffe60014
DG
1779 if (ret < 0) {
1780 /* Somehow, the session daemon is not responding anymore. */
a8ffe244 1781 goto error_push_metadata_fatal;
d88aee68
DG
1782 }
1783
9ce5646a
MD
1784 health_code_update();
1785
d88aee68 1786 /* Wait for more data. */
9ce5646a
MD
1787 health_poll_entry();
1788 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1789 health_poll_exit();
84382d49 1790 if (ret) {
a8ffe244 1791 goto error_push_metadata_fatal;
d88aee68
DG
1792 }
1793
9ce5646a
MD
1794 health_code_update();
1795
28ab034a
JG
1796 ret = lttng_ustconsumer_recv_metadata(
1797 sock, key, offset, len, version, found_channel, 0, 1);
d88aee68 1798 if (ret < 0) {
331744e3 1799 /* error receiving from sessiond */
a8ffe244 1800 goto error_push_metadata_fatal;
331744e3 1801 } else {
97535efa 1802 ret_code = (lttcomm_return_code) ret;
a8ffe244 1803 goto end_push_metadata_msg_sessiond;
d88aee68 1804 }
28ab034a 1805 end_push_metadata_msg_sessiond:
a8ffe244 1806 goto end_msg_sessiond;
28ab034a 1807 error_push_metadata_fatal:
a8ffe244 1808 goto error_fatal;
d88aee68
DG
1809 }
1810 case LTTNG_CONSUMER_SETUP_METADATA:
1811 {
1812 int ret;
1813
1814 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1815 if (ret) {
97535efa 1816 ret_code = (lttcomm_return_code) ret;
d88aee68
DG
1817 }
1818 goto end_msg_sessiond;
ffe60014 1819 }
6dc3064a
DG
1820 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1821 {
594c7c00 1822 struct lttng_consumer_channel *found_channel;
3eb928aa 1823 uint64_t key = msg.u.snapshot_channel.key;
594c7c00 1824 int ret_send;
3eb928aa 1825
594c7c00
SM
1826 found_channel = consumer_find_channel(key);
1827 if (!found_channel) {
3eb928aa
MD
1828 DBG("UST snapshot channel not found for key %" PRIu64, key);
1829 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
10a50311 1830 } else {
3eb928aa 1831 if (msg.u.snapshot_channel.metadata) {
594c7c00
SM
1832 int ret_snapshot;
1833
1834 ret_snapshot = snapshot_metadata(found_channel,
28ab034a
JG
1835 key,
1836 msg.u.snapshot_channel.pathname,
1837 msg.u.snapshot_channel.relayd_id,
1838 ctx);
594c7c00 1839 if (ret_snapshot < 0) {
3eb928aa
MD
1840 ERR("Snapshot metadata failed");
1841 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1842 }
1843 } else {
594c7c00
SM
1844 int ret_snapshot;
1845
28ab034a
JG
1846 ret_snapshot = snapshot_channel(
1847 found_channel,
1848 key,
1849 msg.u.snapshot_channel.pathname,
1850 msg.u.snapshot_channel.relayd_id,
1851 msg.u.snapshot_channel.nb_packets_per_stream,
1852 ctx);
594c7c00 1853 if (ret_snapshot < 0) {
3eb928aa
MD
1854 ERR("Snapshot channel failed");
1855 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1856 }
10a50311
JD
1857 }
1858 }
9ce5646a 1859 health_code_update();
594c7c00
SM
1860 ret_send = consumer_send_status_msg(sock, ret_code);
1861 if (ret_send < 0) {
6dc3064a
DG
1862 /* Somehow, the session daemon is not responding anymore. */
1863 goto end_nosignal;
1864 }
9ce5646a 1865 health_code_update();
6dc3064a
DG
1866 break;
1867 }
fb83fe64
JD
1868 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1869 {
beb59458
MJ
1870 int ret = 0;
1871 uint64_t discarded_events;
fb83fe64
JD
1872 struct lttng_ht_iter iter;
1873 struct lttng_ht *ht;
1874 struct lttng_consumer_stream *stream;
1875 uint64_t id = msg.u.discarded_events.session_id;
1876 uint64_t key = msg.u.discarded_events.channel_key;
1877
28ab034a 1878 DBG("UST consumer discarded events command for session id %" PRIu64, id);
fb83fe64 1879 rcu_read_lock();
fa29bfbf 1880 pthread_mutex_lock(&the_consumer_data.lock);
fb83fe64 1881
fa29bfbf 1882 ht = the_consumer_data.stream_list_ht;
fb83fe64
JD
1883
1884 /*
1885 * We only need a reference to the channel, but they are not
1886 * directly indexed, so we just use the first matching stream
1887 * to extract the information we need, we default to 0 if not
1888 * found (no events are dropped if the channel is not yet in
1889 * use).
1890 */
beb59458 1891 discarded_events = 0;
fb83fe64 1892 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
1893 ht->hash_fct(&id, lttng_ht_seed),
1894 ht->match_fct,
1895 &id,
1896 &iter.iter,
1897 stream,
1898 node_session_id.node)
1899 {
fb83fe64 1900 if (stream->chan->key == key) {
beb59458 1901 discarded_events = stream->chan->discarded_events;
fb83fe64
JD
1902 break;
1903 }
1904 }
fa29bfbf 1905 pthread_mutex_unlock(&the_consumer_data.lock);
fb83fe64
JD
1906 rcu_read_unlock();
1907
28ab034a
JG
1908 DBG("UST consumer discarded events command for session id %" PRIu64
1909 ", channel key %" PRIu64,
1910 id,
1911 key);
fb83fe64
JD
1912
1913 health_code_update();
1914
1915 /* Send back returned value to session daemon */
beb59458 1916 ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
fb83fe64
JD
1917 if (ret < 0) {
1918 PERROR("send discarded events");
1919 goto error_fatal;
1920 }
1921
1922 break;
1923 }
1924 case LTTNG_CONSUMER_LOST_PACKETS:
1925 {
28ab034a 1926 int ret;
9a06e8d4 1927 uint64_t lost_packets;
fb83fe64
JD
1928 struct lttng_ht_iter iter;
1929 struct lttng_ht *ht;
1930 struct lttng_consumer_stream *stream;
1931 uint64_t id = msg.u.lost_packets.session_id;
1932 uint64_t key = msg.u.lost_packets.channel_key;
1933
28ab034a 1934 DBG("UST consumer lost packets command for session id %" PRIu64, id);
fb83fe64 1935 rcu_read_lock();
fa29bfbf 1936 pthread_mutex_lock(&the_consumer_data.lock);
fb83fe64 1937
fa29bfbf 1938 ht = the_consumer_data.stream_list_ht;
fb83fe64
JD
1939
1940 /*
1941 * We only need a reference to the channel, but they are not
1942 * directly indexed, so we just use the first matching stream
1943 * to extract the information we need, we default to 0 if not
1944 * found (no packets lost if the channel is not yet in use).
1945 */
28ab034a 1946 lost_packets = 0;
fb83fe64 1947 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
1948 ht->hash_fct(&id, lttng_ht_seed),
1949 ht->match_fct,
1950 &id,
1951 &iter.iter,
1952 stream,
1953 node_session_id.node)
1954 {
fb83fe64 1955 if (stream->chan->key == key) {
28ab034a 1956 lost_packets = stream->chan->lost_packets;
fb83fe64
JD
1957 break;
1958 }
1959 }
fa29bfbf 1960 pthread_mutex_unlock(&the_consumer_data.lock);
fb83fe64
JD
1961 rcu_read_unlock();
1962
28ab034a
JG
1963 DBG("UST consumer lost packets command for session id %" PRIu64
1964 ", channel key %" PRIu64,
1965 id,
1966 key);
fb83fe64
JD
1967
1968 health_code_update();
1969
1970 /* Send back returned value to session daemon */
28ab034a 1971 ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets));
fb83fe64
JD
1972 if (ret < 0) {
1973 PERROR("send lost packets");
1974 goto error_fatal;
1975 }
1976
1977 break;
1978 }
b3530820
JG
1979 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1980 {
28ab034a 1981 int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe;
594c7c00 1982 ssize_t ret_recv;
b3530820
JG
1983
1984 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1985 /* Successfully received the command's type. */
594c7c00
SM
1986 ret_send = consumer_send_status_msg(sock, ret_code);
1987 if (ret_send < 0) {
b3530820
JG
1988 goto error_fatal;
1989 }
1990
28ab034a 1991 ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
594c7c00 1992 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1993 ERR("Failed to receive channel monitor pipe");
1994 goto error_fatal;
1995 }
1996
1997 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
594c7c00 1998 ret_set_channel_monitor_pipe =
28ab034a 1999 consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
594c7c00 2000 if (!ret_set_channel_monitor_pipe) {
b3530820 2001 int flags;
594c7c00 2002 int ret_fcntl;
b3530820
JG
2003
2004 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
2005 /* Set the pipe as non-blocking. */
594c7c00
SM
2006 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
2007 if (ret_fcntl == -1) {
b3530820
JG
2008 PERROR("fcntl get flags of the channel monitoring pipe");
2009 goto error_fatal;
2010 }
594c7c00 2011 flags = ret_fcntl;
b3530820 2012
28ab034a 2013 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
594c7c00 2014 if (ret_fcntl == -1) {
b3530820
JG
2015 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
2016 goto error_fatal;
2017 }
2018 DBG("Channel monitor pipe set as non-blocking");
2019 } else {
2020 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
2021 }
2022 goto end_msg_sessiond;
2023 }
b99a8d42
JD
2024 case LTTNG_CONSUMER_ROTATE_CHANNEL:
2025 {
594c7c00 2026 struct lttng_consumer_channel *found_channel;
92b7a7f8 2027 uint64_t key = msg.u.rotate_channel.key;
594c7c00 2028 int ret_send_status;
b99a8d42 2029
594c7c00
SM
2030 found_channel = consumer_find_channel(key);
2031 if (!found_channel) {
92b7a7f8
MD
2032 DBG("Channel %" PRIu64 " not found", key);
2033 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2034 } else {
594c7c00
SM
2035 int rotate_channel;
2036
92b7a7f8
MD
2037 /*
2038 * Sample the rotate position of all the streams in
2039 * this channel.
2040 */
594c7c00 2041 rotate_channel = lttng_consumer_rotate_channel(
28ab034a 2042 found_channel, key, msg.u.rotate_channel.relayd_id);
594c7c00 2043 if (rotate_channel < 0) {
92b7a7f8
MD
2044 ERR("Rotate channel failed");
2045 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
2046 }
b99a8d42 2047
92b7a7f8
MD
2048 health_code_update();
2049 }
594c7c00
SM
2050
2051 ret_send_status = consumer_send_status_msg(sock, ret_code);
2052 if (ret_send_status < 0) {
b99a8d42 2053 /* Somehow, the session daemon is not responding anymore. */
41c7a76d 2054 goto end_rotate_channel_nosignal;
b99a8d42
JD
2055 }
2056
2057 /*
2058 * Rotate the streams that are ready right now.
2059 * FIXME: this is a second consecutive iteration over the
2060 * streams in a channel, there is probably a better way to
2061 * handle this, but it needs to be after the
2062 * consumer_send_status_msg() call.
2063 */
594c7c00
SM
2064 if (found_channel) {
2065 int ret_rotate_read_streams;
2066
2067 ret_rotate_read_streams =
28ab034a 2068 lttng_consumer_rotate_ready_streams(found_channel, key);
594c7c00 2069 if (ret_rotate_read_streams < 0) {
92b7a7f8
MD
2070 ERR("Rotate channel failed");
2071 }
b99a8d42
JD
2072 }
2073 break;
28ab034a 2074 end_rotate_channel_nosignal:
41c7a76d 2075 goto end_nosignal;
b99a8d42 2076 }
5f3aff8b
MD
2077 case LTTNG_CONSUMER_CLEAR_CHANNEL:
2078 {
594c7c00 2079 struct lttng_consumer_channel *found_channel;
5f3aff8b 2080 uint64_t key = msg.u.clear_channel.key;
594c7c00 2081 int ret_send_status;
5f3aff8b 2082
594c7c00
SM
2083 found_channel = consumer_find_channel(key);
2084 if (!found_channel) {
5f3aff8b
MD
2085 DBG("Channel %" PRIu64 " not found", key);
2086 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2087 } else {
594c7c00
SM
2088 int ret_clear_channel;
2089
28ab034a 2090 ret_clear_channel = lttng_consumer_clear_channel(found_channel);
594c7c00 2091 if (ret_clear_channel) {
5f3aff8b 2092 ERR("Clear channel failed key %" PRIu64, key);
97535efa 2093 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
2094 }
2095
2096 health_code_update();
2097 }
594c7c00
SM
2098 ret_send_status = consumer_send_status_msg(sock, ret_code);
2099 if (ret_send_status < 0) {
5f3aff8b
MD
2100 /* Somehow, the session daemon is not responding anymore. */
2101 goto end_nosignal;
2102 }
2103 break;
2104 }
d2956687 2105 case LTTNG_CONSUMER_INIT:
00fb02ac 2106 {
594c7c00 2107 int ret_send_status;
328c2fe7 2108 lttng_uuid sessiond_uuid;
594c7c00 2109
28ab034a
JG
2110 std::copy(std::begin(msg.u.init.sessiond_uuid),
2111 std::end(msg.u.init.sessiond_uuid),
2112 sessiond_uuid.begin());
328c2fe7 2113 ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
d88744a4 2114 health_code_update();
594c7c00
SM
2115 ret_send_status = consumer_send_status_msg(sock, ret_code);
2116 if (ret_send_status < 0) {
d88744a4
JD
2117 /* Somehow, the session daemon is not responding anymore. */
2118 goto end_nosignal;
2119 }
2120 break;
2121 }
d2956687 2122 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 2123 {
d2956687 2124 const struct lttng_credentials credentials = {
28ab034a
JG
2125 .uid = LTTNG_OPTIONAL_INIT_VALUE(
2126 msg.u.create_trace_chunk.credentials.value.uid),
2127 .gid = LTTNG_OPTIONAL_INIT_VALUE(
2128 msg.u.create_trace_chunk.credentials.value.gid),
d2956687 2129 };
28ab034a
JG
2130 const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
2131 const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
2132 const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
2133 msg.u.create_trace_chunk.override_name :
2134 NULL;
cbf53d23 2135 struct lttng_directory_handle *chunk_directory_handle = NULL;
d88744a4 2136
d2956687
JG
2137 /*
2138 * The session daemon will only provide a chunk directory file
2139 * descriptor for local traces.
2140 */
2141 if (is_local_trace) {
2142 int chunk_dirfd;
594c7c00
SM
2143 int ret_send_status;
2144 ssize_t ret_recv;
19990ed5 2145
d2956687 2146 /* Acnowledge the reception of the command. */
28ab034a 2147 ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
594c7c00 2148 if (ret_send_status < 0) {
d2956687
JG
2149 /* Somehow, the session daemon is not responding anymore. */
2150 goto end_nosignal;
2151 }
92816cc3 2152
5da88b0f
MD
2153 /*
2154 * Receive trace chunk domain dirfd.
2155 */
28ab034a 2156 ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
594c7c00 2157 if (ret_recv != sizeof(chunk_dirfd)) {
5da88b0f 2158 ERR("Failed to receive trace chunk domain directory file descriptor");
d2956687
JG
2159 goto error_fatal;
2160 }
92816cc3 2161
28ab034a
JG
2162 DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd);
2163 chunk_directory_handle =
2164 lttng_directory_handle_create_from_dirfd(chunk_dirfd);
cbf53d23 2165 if (!chunk_directory_handle) {
5da88b0f 2166 ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
d2956687
JG
2167 if (close(chunk_dirfd)) {
2168 PERROR("Failed to close chunk directory file descriptor");
2169 }
2170 goto error_fatal;
2171 }
92816cc3
JG
2172 }
2173
d2956687 2174 ret_code = lttng_consumer_create_trace_chunk(
28ab034a
JG
2175 !is_local_trace ? &relayd_id : NULL,
2176 msg.u.create_trace_chunk.session_id,
2177 msg.u.create_trace_chunk.chunk_id,
2178 (time_t) msg.u.create_trace_chunk.creation_timestamp,
2179 chunk_override_name,
2180 msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
2181 chunk_directory_handle);
cbf53d23 2182 lttng_directory_handle_put(chunk_directory_handle);
d2956687 2183 goto end_msg_sessiond;
00fb02ac 2184 }
d2956687 2185 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 2186 {
bbc4768c 2187 enum lttng_trace_chunk_command_type close_command =
28ab034a
JG
2188 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
2189 const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f 2190 struct lttcomm_consumer_close_trace_chunk_reply reply;
d00fb490 2191 char closed_trace_chunk_path[LTTNG_PATH_MAX] = {};
ecd1a12f 2192 int ret;
d2956687
JG
2193
2194 ret_code = lttng_consumer_close_trace_chunk(
28ab034a
JG
2195 msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
2196 msg.u.close_trace_chunk.session_id,
2197 msg.u.close_trace_chunk.chunk_id,
2198 (time_t) msg.u.close_trace_chunk.close_timestamp,
2199 msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
2200 closed_trace_chunk_path);
ecd1a12f
MD
2201 reply.ret_code = ret_code;
2202 reply.path_length = strlen(closed_trace_chunk_path) + 1;
2203 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
2204 if (ret != sizeof(reply)) {
2205 goto error_fatal;
2206 }
28ab034a 2207 ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length);
ecd1a12f
MD
2208 if (ret != reply.path_length) {
2209 goto error_fatal;
2210 }
2211 goto end_nosignal;
a1ae2ea5 2212 }
d2956687 2213 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 2214 {
28ab034a 2215 const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
d2956687
JG
2216
2217 ret_code = lttng_consumer_trace_chunk_exists(
28ab034a
JG
2218 msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
2219 msg.u.trace_chunk_exists.session_id,
2220 msg.u.trace_chunk_exists.chunk_id);
d2956687 2221 goto end_msg_sessiond;
04ed9e10
JG
2222 }
2223 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
2224 {
2225 const uint64_t key = msg.u.open_channel_packets.key;
28ab034a 2226 struct lttng_consumer_channel *found_channel = consumer_find_channel(key);
04ed9e10 2227
594c7c00
SM
2228 if (found_channel) {
2229 pthread_mutex_lock(&found_channel->lock);
28ab034a 2230 ret_code = lttng_consumer_open_channel_packets(found_channel);
594c7c00 2231 pthread_mutex_unlock(&found_channel->lock);
04ed9e10
JG
2232 } else {
2233 /*
2234 * The channel could have disappeared in per-pid
2235 * buffering mode.
2236 */
2237 DBG("Channel %" PRIu64 " not found", key);
2238 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2239 }
2240
2241 health_code_update();
2242 goto end_msg_sessiond;
3654ed19 2243 }
3bd1e081
MD
2244 default:
2245 break;
2246 }
3f8e211f 2247
3bd1e081 2248end_nosignal:
4cbc1a04
DG
2249 /*
2250 * Return 1 to indicate success since the 0 value can be a socket
2251 * shutdown during the recv() or send() call.
2252 */
594c7c00 2253 ret_func = 1;
f3a1fc7b 2254 goto end;
ffe60014
DG
2255
2256end_msg_sessiond:
2257 /*
2258 * The returned value here is not useful since either way we'll return 1 to
2259 * the caller because the session daemon socket management is done
2260 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
2261 */
594c7c00
SM
2262 {
2263 int ret_send_status;
2264
2265 ret_send_status = consumer_send_status_msg(sock, ret_code);
2266 if (ret_send_status < 0) {
2267 goto error_fatal;
2268 }
489f70e9 2269 }
594c7c00
SM
2270
2271 ret_func = 1;
f3a1fc7b 2272 goto end;
9ce5646a 2273
ffe60014
DG
2274end_channel_error:
2275 if (channel) {
a00932c8 2276 consumer_del_channel(channel);
ffe60014
DG
2277 }
2278 /* We have to send a status channel message indicating an error. */
594c7c00
SM
2279 {
2280 int ret_send_status;
2281
2282 ret_send_status = consumer_send_status_channel(sock, NULL);
2283 if (ret_send_status < 0) {
2284 /* Stop everything if session daemon can not be notified. */
2285 goto error_fatal;
2286 }
ffe60014 2287 }
594c7c00
SM
2288
2289 ret_func = 1;
f3a1fc7b 2290 goto end;
9ce5646a 2291
ffe60014 2292error_fatal:
ffe60014 2293 /* This will issue a consumer stop. */
594c7c00 2294 ret_func = -1;
f3a1fc7b
JG
2295 goto end;
2296
2297end:
2298 rcu_read_unlock();
2299 health_code_update();
594c7c00 2300 return ret_func;
3bd1e081
MD
2301}
2302
28ab034a 2303int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
fc6d7a51 2304{
a0377dfe
FD
2305 LTTNG_ASSERT(stream);
2306 LTTNG_ASSERT(stream->ustream);
fc6d7a51 2307
881fc67f 2308 return lttng_ust_ctl_flush_buffer(stream->ustream, producer_active);
fc6d7a51
JD
2309}
2310
ffe60014 2311/*
e9404c27 2312 * Take a snapshot for a specific stream.
ffe60014
DG
2313 *
2314 * Returns 0 on success, < 0 on error
2315 */
2316int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 2317{
a0377dfe
FD
2318 LTTNG_ASSERT(stream);
2319 LTTNG_ASSERT(stream->ustream);
ffe60014 2320
b623cb6a 2321 return lttng_ust_ctl_snapshot(stream->ustream);
3bd1e081
MD
2322}
2323
e9404c27
JG
2324/*
2325 * Sample consumed and produced positions for a specific stream.
2326 *
2327 * Returns 0 on success, < 0 on error.
2328 */
28ab034a 2329int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
e9404c27 2330{
a0377dfe
FD
2331 LTTNG_ASSERT(stream);
2332 LTTNG_ASSERT(stream->ustream);
e9404c27 2333
b623cb6a 2334 return lttng_ust_ctl_snapshot_sample_positions(stream->ustream);
e9404c27
JG
2335}
2336
ffe60014
DG
2337/*
2338 * Get the produced position
2339 *
2340 * Returns 0 on success, < 0 on error
2341 */
28ab034a
JG
2342int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
2343 unsigned long *pos)
3bd1e081 2344{
a0377dfe
FD
2345 LTTNG_ASSERT(stream);
2346 LTTNG_ASSERT(stream->ustream);
2347 LTTNG_ASSERT(pos);
7a57cf92 2348
b623cb6a 2349 return lttng_ust_ctl_snapshot_get_produced(stream->ustream, pos);
ffe60014 2350}
7a57cf92 2351
10a50311
JD
2352/*
2353 * Get the consumed position
2354 *
2355 * Returns 0 on success, < 0 on error
2356 */
28ab034a
JG
2357int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
2358 unsigned long *pos)
10a50311 2359{
a0377dfe
FD
2360 LTTNG_ASSERT(stream);
2361 LTTNG_ASSERT(stream->ustream);
2362 LTTNG_ASSERT(pos);
10a50311 2363
b623cb6a 2364 return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
10a50311
JD
2365}
2366
28ab034a 2367int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer)
84a182ce 2368{
a0377dfe
FD
2369 LTTNG_ASSERT(stream);
2370 LTTNG_ASSERT(stream->ustream);
84a182ce 2371
881fc67f 2372 return lttng_ust_ctl_flush_buffer(stream->ustream, producer);
84a182ce
DG
2373}
2374
881fc67f 2375int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
214f70e0 2376{
a0377dfe
FD
2377 LTTNG_ASSERT(stream);
2378 LTTNG_ASSERT(stream->ustream);
214f70e0 2379
881fc67f 2380 return lttng_ust_ctl_clear_buffer(stream->ustream);
214f70e0
JR
2381}
2382
28ab034a 2383int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts)
84a182ce 2384{
a0377dfe
FD
2385 LTTNG_ASSERT(stream);
2386 LTTNG_ASSERT(stream->ustream);
2387 LTTNG_ASSERT(ts);
84a182ce 2388
b623cb6a 2389 return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts);
84a182ce
DG
2390}
2391
28ab034a 2392int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq)
fb83fe64 2393{
a0377dfe
FD
2394 LTTNG_ASSERT(stream);
2395 LTTNG_ASSERT(stream->ustream);
2396 LTTNG_ASSERT(seq);
fb83fe64 2397
b623cb6a 2398 return lttng_ust_ctl_get_sequence_number(stream->ustream, seq);
fb83fe64
JD
2399}
2400
ffe60014 2401/*
0dd01979 2402 * Called when the stream signals the consumer that it has hung up.
ffe60014
DG
2403 */
2404void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
2405{
a0377dfe
FD
2406 LTTNG_ASSERT(stream);
2407 LTTNG_ASSERT(stream->ustream);
2c1dd183 2408
0dd01979
MD
2409 pthread_mutex_lock(&stream->lock);
2410 if (!stream->quiescent) {
881fc67f
MD
2411 if (lttng_ust_ctl_flush_buffer(stream->ustream, 0) < 0) {
2412 ERR("Failed to flush buffer on stream hang-up");
2413 } else {
2414 stream->quiescent = true;
2415 }
0dd01979 2416 }
d9ab8c66 2417
ffe60014 2418 stream->hangup_flush_done = 1;
d9ab8c66 2419 pthread_mutex_unlock(&stream->lock);
ffe60014 2420}
ee77a7b0 2421
ffe60014
DG
2422void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
2423{
4628484a
MD
2424 int i;
2425
a0377dfe
FD
2426 LTTNG_ASSERT(chan);
2427 LTTNG_ASSERT(chan->uchan);
2428 LTTNG_ASSERT(chan->buffer_credentials.is_set);
e316aad5 2429
ea88ca2a
MD
2430 if (chan->switch_timer_enabled == 1) {
2431 consumer_timer_switch_stop(chan);
2432 }
4628484a
MD
2433 for (i = 0; i < chan->nr_stream_fds; i++) {
2434 int ret;
2435
2436 ret = close(chan->stream_fds[i]);
2437 if (ret) {
2438 PERROR("close");
2439 }
2440 if (chan->shm_path[0]) {
2441 char shm_path[PATH_MAX];
2442
2443 ret = get_stream_shm_path(shm_path, chan->shm_path, i);
2444 if (ret) {
2445 ERR("Cannot get stream shm path");
2446 }
d2956687 2447 ret = run_as_unlink(shm_path,
28ab034a
JG
2448 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
2449 chan->buffer_credentials)),
2450 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
2451 chan->buffer_credentials)));
4628484a 2452 if (ret) {
4628484a
MD
2453 PERROR("unlink %s", shm_path);
2454 }
2455 }
2456 }
3bd1e081
MD
2457}
2458
b83e03c4
MD
2459void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
2460{
a0377dfe
FD
2461 LTTNG_ASSERT(chan);
2462 LTTNG_ASSERT(chan->uchan);
2463 LTTNG_ASSERT(chan->buffer_credentials.is_set);
b83e03c4
MD
2464
2465 consumer_metadata_cache_destroy(chan);
b623cb6a 2466 lttng_ust_ctl_destroy_channel(chan->uchan);
ea853771
JR
2467 /* Try to rmdir all directories under shm_path root. */
2468 if (chan->root_shm_path[0]) {
28ab034a
JG
2469 (void) run_as_rmdir_recursive(
2470 chan->root_shm_path,
2471 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
2472 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
2473 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
ea853771 2474 }
b83e03c4
MD
2475 free(chan->stream_fds);
2476}
2477
3bd1e081
MD
2478void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
2479{
a0377dfe
FD
2480 LTTNG_ASSERT(stream);
2481 LTTNG_ASSERT(stream->ustream);
d41f73b7 2482
ea88ca2a
MD
2483 if (stream->chan->switch_timer_enabled == 1) {
2484 consumer_timer_switch_stop(stream->chan);
2485 }
b623cb6a 2486 lttng_ust_ctl_destroy_stream(stream->ustream);
ffe60014 2487}
d41f73b7 2488
6d574024
DG
2489int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
2490{
a0377dfe
FD
2491 LTTNG_ASSERT(stream);
2492 LTTNG_ASSERT(stream->ustream);
6d574024 2493
b623cb6a 2494 return lttng_ust_ctl_stream_get_wakeup_fd(stream->ustream);
6d574024
DG
2495}
2496
2497int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
2498{
a0377dfe
FD
2499 LTTNG_ASSERT(stream);
2500 LTTNG_ASSERT(stream->ustream);
6d574024 2501
b623cb6a 2502 return lttng_ust_ctl_stream_close_wakeup_fd(stream->ustream);
6d574024
DG
2503}
2504
94d49140
JD
2505/*
2506 * Write up to one packet from the metadata cache to the channel.
2507 *
577eea73
JG
2508 * Returns the number of bytes pushed from the cache into the ring buffer, or a
2509 * negative value on error.
94d49140 2510 */
28ab034a 2511static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
94d49140
JD
2512{
2513 ssize_t write_len;
2514 int ret;
2515
2516 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
28ab034a 2517 if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) {
55954e07
JG
2518 /*
2519 * In the context of a user space metadata channel, a
2520 * change in version can be detected in two ways:
2521 * 1) During the pre-consume of the `read_subbuffer` loop,
2522 * 2) When populating the metadata ring buffer (i.e. here).
2523 *
2524 * This function is invoked when there is no metadata
2525 * available in the ring-buffer. If all data was consumed
2526 * up to the size of the metadata cache, there is no metadata
2527 * to insert in the ring-buffer.
2528 *
2529 * However, the metadata version could still have changed (a
2530 * regeneration without any new data will yield the same cache
2531 * size).
2532 *
2533 * The cache's version is checked for a version change and the
2534 * consumed position is reset if one occurred.
2535 *
2536 * This check is only necessary for the user space domain as
2537 * it has to manage the cache explicitly. If this reset was not
2538 * performed, no metadata would be consumed (and no reset would
2539 * occur as part of the pre-consume) until the metadata size
2540 * exceeded the cache size.
2541 */
28ab034a 2542 if (stream->metadata_version != stream->chan->metadata_cache->version) {
55954e07
JG
2543 metadata_stream_reset_cache_consumed_position(stream);
2544 consumer_stream_metadata_set_version(stream,
28ab034a 2545 stream->chan->metadata_cache->version);
55954e07
JG
2546 } else {
2547 ret = 0;
2548 goto end;
2549 }
94d49140
JD
2550 }
2551
28ab034a
JG
2552 write_len = lttng_ust_ctl_write_one_packet_to_channel(
2553 stream->chan->uchan,
2554 &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
2555 stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed);
a0377dfe 2556 LTTNG_ASSERT(write_len != 0);
94d49140
JD
2557 if (write_len < 0) {
2558 ERR("Writing one metadata packet");
f5ba75b4 2559 ret = write_len;
94d49140
JD
2560 goto end;
2561 }
2562 stream->ust_metadata_pushed += write_len;
2563
28ab034a 2564 LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
94d49140
JD
2565 ret = write_len;
2566
0d88e046
JG
2567 /*
2568 * Switch packet (but don't open the next one) on every commit of
2569 * a metadata packet. Since the subbuffer is fully filled (with padding,
2570 * if needed), the stream is "quiescent" after this commit.
2571 */
881fc67f 2572 if (lttng_ust_ctl_flush_buffer(stream->ustream, 1)) {
edb555b5 2573 ERR("Failed to flush buffer while committing one metadata packet");
881fc67f
MD
2574 ret = -EIO;
2575 } else {
2576 stream->quiescent = true;
2577 }
94d49140
JD
2578end:
2579 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2580 return ret;
2581}
2582
94d49140
JD
2583/*
2584 * Sync metadata meaning request them to the session daemon and snapshot to the
2585 * metadata thread can consumer them.
2586 *
c585821b
MD
2587 * Metadata stream lock is held here, but we need to release it when
2588 * interacting with sessiond, else we cause a deadlock with live
2589 * awaiting on metadata to be pushed out.
94d49140 2590 *
cdb72e4e 2591 * The RCU read side lock must be held by the caller.
94d49140 2592 */
28ab034a
JG
2593enum sync_metadata_status
2594lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
2595 struct lttng_consumer_stream *metadata_stream)
94d49140
JD
2596{
2597 int ret;
577eea73 2598 enum sync_metadata_status status;
cdb72e4e 2599 struct lttng_consumer_channel *metadata_channel;
94d49140 2600
a0377dfe
FD
2601 LTTNG_ASSERT(ctx);
2602 LTTNG_ASSERT(metadata_stream);
48b7cdc2 2603 ASSERT_RCU_READ_LOCKED();
94d49140 2604
cdb72e4e
JG
2605 metadata_channel = metadata_stream->chan;
2606 pthread_mutex_unlock(&metadata_stream->lock);
94d49140
JD
2607 /*
2608 * Request metadata from the sessiond, but don't wait for the flush
2609 * because we locked the metadata thread.
2610 */
cdb72e4e
JG
2611 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
2612 pthread_mutex_lock(&metadata_stream->lock);
94d49140 2613 if (ret < 0) {
577eea73 2614 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
2615 goto end;
2616 }
2617
cdb72e4e
JG
2618 /*
2619 * The metadata stream and channel can be deleted while the
2620 * metadata stream lock was released. The streamed is checked
2621 * for deletion before we use it further.
2622 *
2623 * Note that it is safe to access a logically-deleted stream since its
2624 * existence is still guaranteed by the RCU read side lock. However,
2625 * it should no longer be used. The close/deletion of the metadata
2626 * channel and stream already guarantees that all metadata has been
2627 * consumed. Therefore, there is nothing left to do in this function.
2628 */
2629 if (consumer_stream_is_deleted(metadata_stream)) {
2630 DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
28ab034a 2631 metadata_stream->key);
577eea73 2632 status = SYNC_METADATA_STATUS_NO_DATA;
cdb72e4e
JG
2633 goto end;
2634 }
2635
2636 ret = commit_one_metadata_packet(metadata_stream);
577eea73
JG
2637 if (ret < 0) {
2638 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
2639 goto end;
2640 } else if (ret > 0) {
577eea73
JG
2641 status = SYNC_METADATA_STATUS_NEW_DATA;
2642 } else /* ret == 0 */ {
2643 status = SYNC_METADATA_STATUS_NO_DATA;
2644 goto end;
94d49140
JD
2645 }
2646
b623cb6a 2647 ret = lttng_ust_ctl_snapshot(metadata_stream->ustream);
94d49140 2648 if (ret < 0) {
28ab034a
JG
2649 ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d",
2650 ret);
577eea73 2651 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
2652 goto end;
2653 }
2654
94d49140 2655end:
577eea73 2656 return status;
94d49140
JD
2657}
2658
02b3d176
DG
2659/*
2660 * Return 0 on success else a negative value.
2661 */
2662static int notify_if_more_data(struct lttng_consumer_stream *stream,
28ab034a 2663 struct lttng_consumer_local_data *ctx)
02b3d176
DG
2664{
2665 int ret;
b623cb6a 2666 struct lttng_ust_ctl_consumer_stream *ustream;
02b3d176 2667
a0377dfe
FD
2668 LTTNG_ASSERT(stream);
2669 LTTNG_ASSERT(ctx);
02b3d176
DG
2670
2671 ustream = stream->ustream;
2672
2673 /*
2674 * First, we are going to check if there is a new subbuffer available
2675 * before reading the stream wait_fd.
2676 */
2677 /* Get the next subbuffer */
b623cb6a 2678 ret = lttng_ust_ctl_get_next_subbuf(ustream);
02b3d176
DG
2679 if (ret) {
2680 /* No more data found, flag the stream. */
2681 stream->has_data = 0;
2682 ret = 0;
2683 goto end;
2684 }
2685
b623cb6a 2686 ret = lttng_ust_ctl_put_subbuf(ustream);
a0377dfe 2687 LTTNG_ASSERT(!ret);
02b3d176
DG
2688
2689 /* This stream still has data. Flag it and wake up the data thread. */
2690 stream->has_data = 1;
2691
2692 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
2693 ssize_t writelen;
2694
2695 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
2696 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2697 ret = writelen;
2698 goto end;
2699 }
2700
2701 /* The wake up pipe has been notified. */
2702 ctx->has_wakeup = 1;
2703 }
2704 ret = 0;
2705
2706end:
2707 return ret;
2708}
2709
6f9449c2 2710static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream)
fb83fe64 2711{
6f9449c2 2712 int ret = 0;
fb83fe64 2713
fb83fe64 2714 /*
6f9449c2
JG
2715 * We can consume the 1 byte written into the wait_fd by
2716 * UST. Don't trigger error if we cannot read this one byte
2717 * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK.
2718 *
2719 * This is only done when the stream is monitored by a thread,
2720 * before the flush is done after a hangup and if the stream
2721 * is not flagged with data since there might be nothing to
2722 * consume in the wait fd but still have data available
2723 * flagged by the consumer wake up pipe.
fb83fe64 2724 */
6f9449c2
JG
2725 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
2726 char dummy;
2727 ssize_t readlen;
2728
2729 readlen = lttng_read(stream->wait_fd, &dummy, 1);
2730 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2731 ret = readlen;
2732 }
fb83fe64 2733 }
fb83fe64 2734
6f9449c2
JG
2735 return ret;
2736}
2737
2738static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
28ab034a 2739 struct stream_subbuffer *subbuf)
6f9449c2
JG
2740{
2741 int ret;
2742
28ab034a 2743 ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size);
6f9449c2 2744 if (ret) {
fb83fe64
JD
2745 goto end;
2746 }
6f9449c2 2747
28ab034a
JG
2748 ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream,
2749 &subbuf->info.data.padded_subbuf_size);
6f9449c2
JG
2750 if (ret) {
2751 goto end;
fb83fe64 2752 }
fb83fe64
JD
2753
2754end:
2755 return ret;
2756}
2757
6f9449c2 2758static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
28ab034a 2759 struct stream_subbuffer *subbuf)
d41f73b7 2760{
6f9449c2 2761 int ret;
ffe60014 2762
6f9449c2
JG
2763 ret = extract_common_subbuffer_info(stream, subbuf);
2764 if (ret) {
2765 goto end;
2766 }
d41f73b7 2767
55954e07 2768 subbuf->info.metadata.version = stream->metadata_version;
ffe60014 2769
6f9449c2
JG
2770end:
2771 return ret;
2772}
d41f73b7 2773
6f9449c2 2774static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
28ab034a 2775 struct stream_subbuffer *subbuf)
6f9449c2
JG
2776{
2777 int ret;
c617c0c6 2778
6f9449c2
JG
2779 ret = extract_common_subbuffer_info(stream, subbuf);
2780 if (ret) {
2781 goto end;
02d02e31
JD
2782 }
2783
28ab034a 2784 ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size);
6f9449c2
JG
2785 if (ret < 0) {
2786 PERROR("Failed to get sub-buffer packet size");
2787 goto end;
2788 }
04ef1097 2789
28ab034a 2790 ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size);
6f9449c2
JG
2791 if (ret < 0) {
2792 PERROR("Failed to get sub-buffer content size");
2793 goto end;
d41f73b7 2794 }
309167d2 2795
28ab034a
JG
2796 ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream,
2797 &subbuf->info.data.timestamp_begin);
6f9449c2
JG
2798 if (ret < 0) {
2799 PERROR("Failed to get sub-buffer begin timestamp");
2800 goto end;
2801 }
fb83fe64 2802
28ab034a 2803 ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end);
6f9449c2
JG
2804 if (ret < 0) {
2805 PERROR("Failed to get sub-buffer end timestamp");
2806 goto end;
2807 }
2808
28ab034a
JG
2809 ret = lttng_ust_ctl_get_events_discarded(stream->ustream,
2810 &subbuf->info.data.events_discarded);
6f9449c2
JG
2811 if (ret) {
2812 PERROR("Failed to get sub-buffer events discarded count");
2813 goto end;
2814 }
2815
b623cb6a 2816 ret = lttng_ust_ctl_get_sequence_number(stream->ustream,
28ab034a 2817 &subbuf->info.data.sequence_number.value);
6f9449c2
JG
2818 if (ret) {
2819 /* May not be supported by older LTTng-modules. */
2820 if (ret != -ENOTTY) {
2821 PERROR("Failed to get sub-buffer sequence number");
2822 goto end;
fb83fe64 2823 }
1c20f0e2 2824 } else {
6f9449c2 2825 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
2826 }
2827
28ab034a 2828 ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id);
6f9449c2
JG
2829 if (ret < 0) {
2830 PERROR("Failed to get stream id");
2831 goto end;
2832 }
1d4dfdef 2833
b623cb6a 2834 ret = lttng_ust_ctl_get_instance_id(stream->ustream,
28ab034a 2835 &subbuf->info.data.stream_instance_id.value);
6f9449c2
JG
2836 if (ret) {
2837 /* May not be supported by older LTTng-modules. */
2838 if (ret != -ENOTTY) {
2839 PERROR("Failed to get stream instance id");
2840 goto end;
2841 }
2842 } else {
2843 subbuf->info.data.stream_instance_id.is_set = true;
2844 }
2845end:
2846 return ret;
2847}
1d4dfdef 2848
6f9449c2 2849static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
28ab034a 2850 struct stream_subbuffer *subbuffer)
6f9449c2
JG
2851{
2852 int ret;
2853 const char *addr;
1d4dfdef 2854
28ab034a 2855 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
6f9449c2
JG
2856 if (ret) {
2857 goto end;
2858 }
02d02e31 2859
6f9449c2 2860 ret = get_current_subbuf_addr(stream, &addr);
128708c3 2861 if (ret) {
6f9449c2 2862 goto end;
128708c3
JG
2863 }
2864
28ab034a
JG
2865 subbuffer->buffer.buffer =
2866 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
a0377dfe 2867 LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL);
6f9449c2
JG
2868end:
2869 return ret;
2870}
fd424d99 2871
28ab034a
JG
2872static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream,
2873 struct stream_subbuffer *subbuffer)
6f9449c2
JG
2874{
2875 int ret;
b6797c8e 2876 enum get_next_subbuffer_status status;
331744e3 2877
b623cb6a 2878 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
b6797c8e
JG
2879 switch (ret) {
2880 case 0:
2881 status = GET_NEXT_SUBBUFFER_STATUS_OK;
2882 break;
2883 case -ENODATA:
28ab034a 2884 case -EAGAIN:
b6797c8e
JG
2885 /*
2886 * The caller only expects -ENODATA when there is no data to
2887 * read, but the kernel tracer returns -EAGAIN when there is
2888 * currently no data for a non-finalized stream, and -ENODATA
2889 * when there is no data for a finalized stream. Those can be
2890 * combined into a -ENODATA return value.
2891 */
2892 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
2893 goto end;
2894 default:
2895 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 2896 goto end;
02b3d176
DG
2897 }
2898
6f9449c2
JG
2899 ret = get_next_subbuffer_common(stream, subbuffer);
2900 if (ret) {
b6797c8e 2901 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
23d56598 2902 goto end;
1c20f0e2 2903 }
6f9449c2 2904end:
b6797c8e 2905 return status;
6f9449c2 2906}
1c20f0e2 2907
28ab034a
JG
2908static enum get_next_subbuffer_status
2909get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
2910 struct stream_subbuffer *subbuffer)
6f9449c2
JG
2911{
2912 int ret;
f5ba75b4
JG
2913 bool cache_empty;
2914 bool got_subbuffer;
2915 bool coherent;
2916 bool buffer_empty;
2917 unsigned long consumed_pos, produced_pos;
b6797c8e 2918 enum get_next_subbuffer_status status;
6f9449c2 2919
f5ba75b4 2920 do {
b623cb6a 2921 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
f5ba75b4
JG
2922 if (ret == 0) {
2923 got_subbuffer = true;
2924 } else {
2925 got_subbuffer = false;
2926 if (ret != -EAGAIN) {
2927 /* Fatal error. */
b6797c8e 2928 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
f5ba75b4
JG
2929 goto end;
2930 }
c585821b
MD
2931 }
2932
f5ba75b4
JG
2933 /*
2934 * Determine if the cache is empty and ensure that a sub-buffer
2935 * is made available if the cache is not empty.
2936 */
2937 if (!got_subbuffer) {
2938 ret = commit_one_metadata_packet(stream);
2939 if (ret < 0 && ret != -ENOBUFS) {
b6797c8e 2940 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
f5ba75b4
JG
2941 goto end;
2942 } else if (ret == 0) {
2943 /* Not an error, the cache is empty. */
2944 cache_empty = true;
b6797c8e 2945 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
f5ba75b4
JG
2946 goto end;
2947 } else {
2948 cache_empty = false;
2949 }
2950 } else {
2951 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
9eac9828 2952 cache_empty = stream->chan->metadata_cache->contents.size ==
28ab034a 2953 stream->ust_metadata_pushed;
f5ba75b4 2954 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
94d49140 2955 }
f5ba75b4 2956 } while (!got_subbuffer);
94d49140 2957
f5ba75b4 2958 /* Populate sub-buffer infos and view. */
6f9449c2
JG
2959 ret = get_next_subbuffer_common(stream, subbuffer);
2960 if (ret) {
b6797c8e 2961 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 2962 goto end;
309167d2 2963 }
f5ba75b4
JG
2964
2965 ret = lttng_ustconsumer_sample_snapshot_positions(stream);
2966 if (ret < 0) {
2967 /*
2968 * -EAGAIN is not expected since we got a sub-buffer and haven't
2969 * pushed the consumption position yet (on put_next).
2970 */
2971 PERROR("Failed to take a snapshot of metadata buffer positions");
b6797c8e 2972 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
f5ba75b4
JG
2973 goto end;
2974 }
2975
2976 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
2977 if (ret) {
2978 PERROR("Failed to get metadata consumed position");
b6797c8e 2979 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
f5ba75b4
JG
2980 goto end;
2981 }
2982
2983 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
2984 if (ret) {
2985 PERROR("Failed to get metadata produced position");
b6797c8e 2986 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
f5ba75b4
JG
2987 goto end;
2988 }
2989
2990 /* Last sub-buffer of the ring buffer ? */
2991 buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
2992
2993 /*
2994 * The sessiond registry lock ensures that coherent units of metadata
2995 * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
2996 * acquired, the cache is empty, and it is the only available sub-buffer
2997 * available, it is safe to assume that it is "coherent".
2998 */
2999 coherent = got_subbuffer && cache_empty && buffer_empty;
3000
3001 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
b6797c8e 3002 status = GET_NEXT_SUBBUFFER_STATUS_OK;
23d56598 3003end:
b6797c8e 3004 return status;
d41f73b7
MD
3005}
3006
6f9449c2 3007static int put_next_subbuffer(struct lttng_consumer_stream *stream,
28ab034a 3008 struct stream_subbuffer *subbuffer __attribute__((unused)))
6f9449c2 3009{
b623cb6a 3010 const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
6f9449c2 3011
a0377dfe 3012 LTTNG_ASSERT(ret == 0);
6f9449c2
JG
3013 return ret;
3014}
3015
3016static int signal_metadata(struct lttng_consumer_stream *stream,
28ab034a 3017 struct lttng_consumer_local_data *ctx __attribute__((unused)))
6f9449c2 3018{
8db3acaf 3019 ASSERT_LOCKED(stream->metadata_rdv_lock);
6f9449c2
JG
3020 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
3021}
3022
28ab034a 3023static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
6f9449c2 3024{
f5ba75b4
JG
3025 int ret = 0;
3026
6f9449c2
JG
3027 stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
3028 if (stream->metadata_flag) {
28ab034a
JG
3029 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata;
3030 stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
6f9449c2 3031 stream->read_subbuffer_ops.reset_metadata =
28ab034a 3032 metadata_stream_reset_cache_consumed_position;
f5ba75b4
JG
3033 if (stream->chan->is_live) {
3034 stream->read_subbuffer_ops.on_sleep = signal_metadata;
28ab034a 3035 ret = consumer_stream_enable_metadata_bucketization(stream);
f5ba75b4
JG
3036 if (ret) {
3037 goto end;
3038 }
3039 }
6f9449c2 3040 } else {
28ab034a
JG
3041 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer;
3042 stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
6f9449c2
JG
3043 stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
3044 if (stream->chan->is_live) {
28ab034a 3045 stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index;
6f9449c2
JG
3046 }
3047 }
3048
3049 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
3050end:
3051 return ret;
6f9449c2
JG
3052}
3053
ffe60014
DG
3054/*
3055 * Called when a stream is created.
fe4477ee
JD
3056 *
3057 * Return 0 on success or else a negative value.
ffe60014 3058 */
d41f73b7
MD
3059int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
3060{
fe4477ee
JD
3061 int ret;
3062
a0377dfe 3063 LTTNG_ASSERT(stream);
10a50311 3064
d2956687
JG
3065 /*
3066 * Don't create anything if this is set for streaming or if there is
3067 * no current trace chunk on the parent channel.
3068 */
3069 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
28ab034a 3070 stream->chan->trace_chunk) {
d2956687
JG
3071 ret = consumer_stream_create_output_files(stream, true);
3072 if (ret) {
fe4477ee
JD
3073 goto error;
3074 }
fe4477ee 3075 }
6f9449c2
JG
3076
3077 lttng_ustconsumer_set_stream_ops(stream);
fe4477ee
JD
3078 ret = 0;
3079
3080error:
3081 return ret;
d41f73b7 3082}
ca22feea
DG
3083
3084/*
3085 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
3086 * stream. Consumer data lock MUST be acquired before calling this function
3087 * and the stream lock.
ca22feea 3088 *
6d805429 3089 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
3090 * data is available for trace viewer reading.
3091 */
6d805429 3092int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
3093{
3094 int ret;
3095
a0377dfe
FD
3096 LTTNG_ASSERT(stream);
3097 LTTNG_ASSERT(stream->ustream);
b1316da1 3098 ASSERT_LOCKED(stream->lock);
ca22feea 3099
6d805429 3100 DBG("UST consumer checking data pending");
c8f59ee5 3101
ca6b395f
MD
3102 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
3103 ret = 0;
3104 goto end;
3105 }
3106
04ef1097 3107 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
e6ee4eab
DG
3108 uint64_t contiguous, pushed;
3109
3110 /* Ease our life a bit. */
934ba8fd 3111 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
9eac9828 3112 contiguous = stream->chan->metadata_cache->contents.size;
934ba8fd 3113 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
e6ee4eab
DG
3114 pushed = stream->ust_metadata_pushed;
3115
04ef1097
MD
3116 /*
3117 * We can simply check whether all contiguously available data
3118 * has been pushed to the ring buffer, since the push operation
3119 * is performed within get_next_subbuf(), and because both
3120 * get_next_subbuf() and put_next_subbuf() are issued atomically
3121 * thanks to the stream lock within
3122 * lttng_ustconsumer_read_subbuffer(). This basically means that
3123 * whetnever ust_metadata_pushed is incremented, the associated
3124 * metadata has been consumed from the metadata stream.
3125 */
28ab034a
JG
3126 DBG("UST consumer metadata pending check: contiguous %" PRIu64
3127 " vs pushed %" PRIu64,
3128 contiguous,
3129 pushed);
a0377dfe 3130 LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0);
e6ee4eab 3131 if ((contiguous != pushed) ||
28ab034a
JG
3132 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
3133 ret = 1; /* Data is pending */
04ef1097
MD
3134 goto end;
3135 }
3136 } else {
b623cb6a 3137 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
04ef1097
MD
3138 if (ret == 0) {
3139 /*
3140 * There is still data so let's put back this
3141 * subbuffer.
3142 */
b623cb6a 3143 ret = lttng_ust_ctl_put_subbuf(stream->ustream);
a0377dfe 3144 LTTNG_ASSERT(ret == 0);
28ab034a 3145 ret = 1; /* Data is pending */
04ef1097
MD
3146 goto end;
3147 }
ca22feea
DG
3148 }
3149
6d805429
DG
3150 /* Data is NOT pending so ready to be read. */
3151 ret = 0;
ca22feea 3152
6efae65e
DG
3153end:
3154 return ret;
ca22feea 3155}
d88aee68 3156
6d574024
DG
3157/*
3158 * Stop a given metadata channel timer if enabled and close the wait fd which
3159 * is the poll pipe of the metadata stream.
3160 *
d2c82a5a 3161 * This MUST be called with the metadata channel lock acquired.
6d574024
DG
3162 */
3163void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
3164{
3165 int ret;
3166
a0377dfe
FD
3167 LTTNG_ASSERT(metadata);
3168 LTTNG_ASSERT(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
6d574024
DG
3169
3170 DBG("Closing metadata channel key %" PRIu64, metadata->key);
3171
3172 if (metadata->switch_timer_enabled == 1) {
3173 consumer_timer_switch_stop(metadata);
3174 }
3175
3176 if (!metadata->metadata_stream) {
3177 goto end;
3178 }
3179
3180 /*
3181 * Closing write side so the thread monitoring the stream wakes up if any
3182 * and clean the metadata stream.
3183 */
3184 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
3185 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
3186 if (ret < 0) {
3187 PERROR("closing metadata pipe write side");
3188 }
3189 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
3190 }
3191
3192end:
3193 return;
3194}
3195
d88aee68
DG
3196/*
3197 * Close every metadata stream wait fd of the metadata hash table. This
3198 * function MUST be used very carefully so not to run into a race between the
3199 * metadata thread handling streams and this function closing their wait fd.
3200 *
3201 * For UST, this is used when the session daemon hangs up. Its the metadata
3202 * producer so calling this is safe because we are assured that no state change
3203 * can occur in the metadata thread for the streams in the hash table.
3204 */
6d574024 3205void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
d88aee68 3206{
d88aee68
DG
3207 struct lttng_ht_iter iter;
3208 struct lttng_consumer_stream *stream;
3209
a0377dfe
FD
3210 LTTNG_ASSERT(metadata_ht);
3211 LTTNG_ASSERT(metadata_ht->ht);
d88aee68
DG
3212
3213 DBG("UST consumer closing all metadata streams");
3214
3215 rcu_read_lock();
28ab034a 3216 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
9ce5646a
MD
3217 health_code_update();
3218
be2b50c7 3219 pthread_mutex_lock(&stream->chan->lock);
6d574024 3220 lttng_ustconsumer_close_metadata(stream->chan);
be2b50c7 3221 pthread_mutex_unlock(&stream->chan->lock);
d88aee68
DG
3222 }
3223 rcu_read_unlock();
3224}
d8ef542d
MD
3225
3226void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
3227{
3228 int ret;
3229
b623cb6a 3230 ret = lttng_ust_ctl_stream_close_wakeup_fd(stream->ustream);
d8ef542d
MD
3231 if (ret < 0) {
3232 ERR("Unable to close wakeup fd");
3233 }
3234}
331744e3 3235
f666ae70
MD
3236/*
3237 * Please refer to consumer-timer.c before adding any lock within this
3238 * function or any of its callees. Timers have a very strict locking
3239 * semantic with respect to teardown. Failure to respect this semantic
3240 * introduces deadlocks.
c585821b
MD
3241 *
3242 * DON'T hold the metadata lock when calling this function, else this
3243 * can cause deadlock involving consumer awaiting for metadata to be
3244 * pushed out due to concurrent interaction with the session daemon.
f666ae70 3245 */
331744e3 3246int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3247 struct lttng_consumer_channel *channel,
3248 int timer,
3249 int wait)
331744e3
JD
3250{
3251 struct lttcomm_metadata_request_msg request;
3252 struct lttcomm_consumer_msg msg;
0c759fc9 3253 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
93ec662e 3254 uint64_t len, key, offset, version;
331744e3
JD
3255 int ret;
3256
a0377dfe
FD
3257 LTTNG_ASSERT(channel);
3258 LTTNG_ASSERT(channel->metadata_cache);
331744e3 3259
53efb85a
MD
3260 memset(&request, 0, sizeof(request));
3261
331744e3 3262 /* send the metadata request to sessiond */
fa29bfbf 3263 switch (the_consumer_data.type) {
331744e3
JD
3264 case LTTNG_CONSUMER64_UST:
3265 request.bits_per_long = 64;
3266 break;
3267 case LTTNG_CONSUMER32_UST:
3268 request.bits_per_long = 32;
3269 break;
3270 default:
3271 request.bits_per_long = 0;
3272 break;
3273 }
3274
3275 request.session_id = channel->session_id;
1950109e 3276 request.session_id_per_pid = channel->session_id_per_pid;
567eb353
DG
3277 /*
3278 * Request the application UID here so the metadata of that application can
3279 * be sent back. The channel UID corresponds to the user UID of the session
3280 * used for the rights on the stream file(s).
3281 */
3282 request.uid = channel->ust_app_uid;
331744e3 3283 request.key = channel->key;
567eb353 3284
28ab034a
JG
3285 DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64
3286 ", app UID %u and channel key %" PRIu64,
3287 request.session_id,
3288 request.session_id_per_pid,
3289 request.uid,
3290 request.key);
331744e3 3291
75d83e50 3292 pthread_mutex_lock(&ctx->metadata_socket_lock);
9ce5646a
MD
3293
3294 health_code_update();
3295
28ab034a 3296 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request));
331744e3
JD
3297 if (ret < 0) {
3298 ERR("Asking metadata to sessiond");
3299 goto end;
3300 }
3301
9ce5646a
MD
3302 health_code_update();
3303
331744e3 3304 /* Receive the metadata from sessiond */
28ab034a 3305 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg));
331744e3 3306 if (ret != sizeof(msg)) {
28ab034a 3307 DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg));
331744e3
JD
3308 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3309 /*
3310 * The ret value might 0 meaning an orderly shutdown but this is ok
3311 * since the caller handles this.
3312 */
3313 goto end;
3314 }
3315
9ce5646a
MD
3316 health_code_update();
3317
331744e3
JD
3318 if (msg.cmd_type == LTTNG_ERR_UND) {
3319 /* No registry found */
28ab034a 3320 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
331744e3
JD
3321 ret = 0;
3322 goto end;
3323 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
3324 ERR("Unexpected cmd_type received %d", msg.cmd_type);
3325 ret = -1;
3326 goto end;
3327 }
3328
3329 len = msg.u.push_metadata.len;
3330 key = msg.u.push_metadata.key;
3331 offset = msg.u.push_metadata.target_offset;
93ec662e 3332 version = msg.u.push_metadata.version;
331744e3 3333
a0377dfe 3334 LTTNG_ASSERT(key == channel->key);
331744e3
JD
3335 if (len == 0) {
3336 DBG("No new metadata to receive for key %" PRIu64, key);
3337 }
3338
9ce5646a
MD
3339 health_code_update();
3340
331744e3 3341 /* Tell session daemon we are ready to receive the metadata. */
28ab034a 3342 ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS);
331744e3
JD
3343 if (ret < 0 || len == 0) {
3344 /*
3345 * Somehow, the session daemon is not responding anymore or there is
3346 * nothing to receive.
3347 */
3348 goto end;
3349 }
3350
9ce5646a
MD
3351 health_code_update();
3352
28ab034a
JG
3353 ret = lttng_ustconsumer_recv_metadata(
3354 ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
1eb682be 3355 if (ret >= 0) {
f2a444f1
DG
3356 /*
3357 * Only send the status msg if the sessiond is alive meaning a positive
3358 * ret code.
3359 */
1eb682be 3360 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
f2a444f1 3361 }
331744e3
JD
3362 ret = 0;
3363
3364end:
9ce5646a
MD
3365 health_code_update();
3366
75d83e50 3367 pthread_mutex_unlock(&ctx->metadata_socket_lock);
331744e3
JD
3368 return ret;
3369}
70190e1c
DG
3370
3371/*
3372 * Return the ustctl call for the get stream id.
3373 */
28ab034a 3374int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id)
70190e1c 3375{
a0377dfe
FD
3376 LTTNG_ASSERT(stream);
3377 LTTNG_ASSERT(stream_id);
70190e1c 3378
b623cb6a 3379 return lttng_ust_ctl_get_stream_id(stream->ustream, stream_id);
70190e1c 3380}
881fc67f
MD
3381
3382void lttng_ustconsumer_sigbus_handle(void *addr)
3383{
3384 lttng_ust_ctl_sigbus_handle(addr);
3385}
This page took 0.300598 seconds and 4 git commands to generate.