Fix: syscall event rule: emission sites not compared in is_equal
[lttng-tools.git] / src / common / consumer / consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
c9e313bc
SM
11#include <common/align.hpp>
12#include <common/common.hpp>
13#include <common/compat/endian.hpp>
14#include <common/compat/poll.hpp>
15#include <common/consumer/consumer-metadata-cache.hpp>
16#include <common/consumer/consumer-stream.hpp>
17#include <common/consumer/consumer-testpoint.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/dynamic-array.hpp>
21#include <common/index/ctf-index.hpp>
22#include <common/index/index.hpp>
671e39d7 23#include <common/io-hint.hpp>
c9e313bc
SM
24#include <common/kernel-consumer/kernel-consumer.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
26#include <common/relayd/relayd.hpp>
27#include <common/sessiond-comm/relayd.hpp>
28#include <common/sessiond-comm/sessiond-comm.hpp>
29#include <common/string-utils/format.hpp>
30#include <common/time.hpp>
31#include <common/trace-chunk-registry.hpp>
32#include <common/trace-chunk.hpp>
56047f5a 33#include <common/urcu.hpp>
c9e313bc
SM
34#include <common/ust-consumer/ust-consumer.hpp>
35#include <common/utils.hpp>
3bd1e081 36
28ab034a 37#include <bin/lttng-consumerd/health-consumerd.hpp>
671e39d7 38#include <fcntl.h>
28ab034a
JG
39#include <inttypes.h>
40#include <poll.h>
41#include <pthread.h>
42#include <signal.h>
43#include <stdlib.h>
44#include <string.h>
45#include <sys/mman.h>
46#include <sys/socket.h>
47#include <sys/types.h>
fbd566c2 48#include <type_traits>
28ab034a
JG
49#include <unistd.h>
50
97535efa 51lttng_consumer_global_data the_consumer_data;
3bd1e081 52
d8ef542d
MD
53enum consumer_channel_action {
54 CONSUMER_CHANNEL_ADD,
a0cbdd2e 55 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
56 CONSUMER_CHANNEL_QUIT,
57};
58
f1494934 59namespace {
d8ef542d
MD
60struct consumer_channel_msg {
61 enum consumer_channel_action action;
28ab034a
JG
62 struct lttng_consumer_channel *chan; /* add */
63 uint64_t key; /* del */
d8ef542d
MD
64};
65
f1494934
JG
66/*
67 * Global hash table containing respectively metadata and data streams. The
68 * stream element in this ht should only be updated by the metadata poll thread
69 * for the metadata and the data poll thread for the data.
70 */
71struct lttng_ht *metadata_ht;
72struct lttng_ht *data_ht;
73} /* namespace */
74
80957876 75/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
76int data_consumption_paused;
77
3bd1e081
MD
78/*
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
82 * polling threads.
83 */
10211f5c 84int consumer_quit;
3bd1e081 85
cd9adb8b 86static const char *get_consumer_domain()
5da88b0f 87{
fa29bfbf 88 switch (the_consumer_data.type) {
5da88b0f
MD
89 case LTTNG_CONSUMER_KERNEL:
90 return DEFAULT_KERNEL_TRACE_DIR;
91 case LTTNG_CONSUMER64_UST:
92 /* Fall-through. */
93 case LTTNG_CONSUMER32_UST:
94 return DEFAULT_UST_TRACE_DIR;
95 default:
96 abort();
97 }
98}
99
acdb9057
DG
100/*
101 * Notify a thread lttng pipe to poll back again. This usually means that some
102 * global state has changed so we just send back the thread in a poll wait
103 * call.
104 */
105static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
106{
cd9adb8b 107 struct lttng_consumer_stream *null_stream = nullptr;
acdb9057 108
a0377dfe 109 LTTNG_ASSERT(pipe);
acdb9057 110
5c7248cd
JG
111 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
112 pointer. */
acdb9057
DG
113}
114
5c635c72
MD
115static void notify_health_quit_pipe(int *pipe)
116{
6cd525e8 117 ssize_t ret;
5c635c72 118
6cd525e8
MD
119 ret = lttng_write(pipe[1], "4", 1);
120 if (ret < 1) {
5c635c72
MD
121 PERROR("write consumer health quit");
122 }
123}
124
d8ef542d 125static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
126 struct lttng_consumer_channel *chan,
127 uint64_t key,
128 enum consumer_channel_action action)
d8ef542d
MD
129{
130 struct consumer_channel_msg msg;
6cd525e8 131 ssize_t ret;
d8ef542d 132
e56251fc
DG
133 memset(&msg, 0, sizeof(msg));
134
d8ef542d
MD
135 msg.action = action;
136 msg.chan = chan;
f21dae48 137 msg.key = key;
6cd525e8
MD
138 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
139 if (ret < sizeof(msg)) {
140 PERROR("notify_channel_pipe write error");
141 }
d8ef542d
MD
142}
143
28ab034a 144void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
a0cbdd2e 145{
cd9adb8b 146 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
a0cbdd2e
MD
147}
148
d8ef542d 149static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
150 struct lttng_consumer_channel **chan,
151 uint64_t *key,
152 enum consumer_channel_action *action)
d8ef542d
MD
153{
154 struct consumer_channel_msg msg;
6cd525e8 155 ssize_t ret;
d8ef542d 156
6cd525e8
MD
157 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
158 if (ret < sizeof(msg)) {
159 ret = -1;
160 goto error;
d8ef542d 161 }
6cd525e8
MD
162 *action = msg.action;
163 *chan = msg.chan;
164 *key = msg.key;
165error:
166 return (int) ret;
d8ef542d
MD
167}
168
212d67a2
DG
169/*
170 * Cleanup the stream list of a channel. Those streams are not yet globally
171 * visible
172 */
173static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
174{
175 struct lttng_consumer_stream *stream, *stmp;
176
a0377dfe 177 LTTNG_ASSERT(channel);
212d67a2
DG
178
179 /* Delete streams that might have been left in the stream list. */
28ab034a 180 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
cd9adb8b 181 consumer_stream_destroy(stream, nullptr);
212d67a2
DG
182 }
183}
184
3bd1e081
MD
185/*
186 * Find a stream. The consumer_data.lock must be locked during this
187 * call.
188 */
28ab034a 189static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
3bd1e081 190{
e4421fec 191 struct lttng_ht_iter iter;
d88aee68 192 struct lttng_ht_node_u64 *node;
cd9adb8b 193 struct lttng_consumer_stream *stream = nullptr;
3bd1e081 194
a0377dfe 195 LTTNG_ASSERT(ht);
8389e4f8 196
d88aee68
DG
197 /* -1ULL keys are lookup failures */
198 if (key == (uint64_t) -1ULL) {
cd9adb8b 199 return nullptr;
7a57cf92 200 }
e4421fec 201
56047f5a 202 lttng::urcu::read_lock_guard read_lock;
6065ceec 203
d88aee68
DG
204 lttng_ht_lookup(ht, &key, &iter);
205 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 206 if (node != nullptr) {
0114db0e 207 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
3bd1e081 208 }
e4421fec
DG
209
210 return stream;
3bd1e081
MD
211}
212
da009f2c 213static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
214{
215 struct lttng_consumer_stream *stream;
216
56047f5a 217 lttng::urcu::read_lock_guard read_lock;
ffe60014 218 stream = find_stream(key, ht);
04253271 219 if (stream) {
da009f2c 220 stream->key = (uint64_t) -1ULL;
04253271
MD
221 /*
222 * We don't want the lookup to match, but we still need
223 * to iterate on this stream when iterating over the hash table. Just
224 * change the node key.
225 */
da009f2c 226 stream->node.key = (uint64_t) -1ULL;
04253271 227 }
7ad0a0cb
MD
228}
229
d56db448
DG
230/*
231 * Return a channel object for the given key.
232 *
233 * RCU read side lock MUST be acquired before calling this function and
234 * protects the channel ptr.
235 */
d88aee68 236struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 237{
e4421fec 238 struct lttng_ht_iter iter;
d88aee68 239 struct lttng_ht_node_u64 *node;
cd9adb8b 240 struct lttng_consumer_channel *channel = nullptr;
3bd1e081 241
48b7cdc2
FD
242 ASSERT_RCU_READ_LOCKED();
243
d88aee68
DG
244 /* -1ULL keys are lookup failures */
245 if (key == (uint64_t) -1ULL) {
cd9adb8b 246 return nullptr;
7a57cf92 247 }
e4421fec 248
fa29bfbf 249 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
d88aee68 250 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 251 if (node != nullptr) {
0114db0e 252 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
3bd1e081 253 }
e4421fec
DG
254
255 return channel;
3bd1e081
MD
256}
257
b5a6470f
DG
258/*
259 * There is a possibility that the consumer does not have enough time between
260 * the close of the channel on the session daemon and the cleanup in here thus
261 * once we have a channel add with an existing key, we know for sure that this
262 * channel will eventually get cleaned up by all streams being closed.
263 *
264 * This function just nullifies the already existing channel key.
265 */
266static void steal_channel_key(uint64_t key)
267{
268 struct lttng_consumer_channel *channel;
269
56047f5a 270 lttng::urcu::read_lock_guard read_lock;
b5a6470f
DG
271 channel = consumer_find_channel(key);
272 if (channel) {
273 channel->key = (uint64_t) -1ULL;
274 /*
275 * We don't want the lookup to match, but we still need to iterate on
276 * this channel when iterating over the hash table. Just change the
277 * node key.
278 */
279 channel->node.key = (uint64_t) -1ULL;
280 }
b5a6470f
DG
281}
282
ffe60014 283static void free_channel_rcu(struct rcu_head *head)
702b1ea4 284{
28ab034a 285 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
ffe60014 286 struct lttng_consumer_channel *channel =
0114db0e 287 lttng::utils::container_of(node, &lttng_consumer_channel::node);
702b1ea4 288
fa29bfbf 289 switch (the_consumer_data.type) {
b83e03c4
MD
290 case LTTNG_CONSUMER_KERNEL:
291 break;
292 case LTTNG_CONSUMER32_UST:
293 case LTTNG_CONSUMER64_UST:
294 lttng_ustconsumer_free_channel(channel);
295 break;
296 default:
297 ERR("Unknown consumer_data type");
298 abort();
299 }
32670d71
JG
300
301 delete channel;
702b1ea4
MD
302}
303
00e2e675
DG
304/*
305 * RCU protected relayd socket pair free.
306 */
ffe60014 307static void free_relayd_rcu(struct rcu_head *head)
00e2e675 308{
28ab034a 309 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
00e2e675 310 struct consumer_relayd_sock_pair *relayd =
0114db0e 311 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675 312
8994307f
DG
313 /*
314 * Close all sockets. This is done in the call RCU since we don't want the
315 * socket fds to be reassigned thus potentially creating bad state of the
316 * relayd object.
317 *
318 * We do not have to lock the control socket mutex here since at this stage
319 * there is no one referencing to this relayd object.
320 */
321 (void) relayd_close(&relayd->control_sock);
322 (void) relayd_close(&relayd->data_sock);
323
3a84e2f3 324 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
325 free(relayd);
326}
327
328/*
329 * Destroy and free relayd socket pair object.
00e2e675 330 */
51230d70 331void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
332{
333 int ret;
334 struct lttng_ht_iter iter;
335
cd9adb8b 336 if (relayd == nullptr) {
173af62f
DG
337 return;
338 }
339
00e2e675
DG
340 DBG("Consumer destroy and close relayd socket pair");
341
342 iter.iter.node = &relayd->node.node;
fa29bfbf 343 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
173af62f 344 if (ret != 0) {
8994307f 345 /* We assume the relayd is being or is destroyed */
173af62f
DG
346 return;
347 }
00e2e675 348
00e2e675 349 /* RCU free() call */
ffe60014
DG
350 call_rcu(&relayd->node.head, free_relayd_rcu);
351}
352
353/*
354 * Remove a channel from the global list protected by a mutex. This function is
355 * also responsible for freeing its data structures.
356 */
357void consumer_del_channel(struct lttng_consumer_channel *channel)
358{
ffe60014
DG
359 struct lttng_ht_iter iter;
360
d88aee68 361 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014 362
fa29bfbf 363 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 364 pthread_mutex_lock(&channel->lock);
ffe60014 365
212d67a2
DG
366 /* Destroy streams that might have been left in the stream list. */
367 clean_channel_stream_list(channel);
51e762e5 368
d3e2ba59
JD
369 if (channel->live_timer_enabled == 1) {
370 consumer_timer_live_stop(channel);
371 }
e9404c27
JG
372 if (channel->monitor_timer_enabled == 1) {
373 consumer_timer_monitor_stop(channel);
374 }
d3e2ba59 375
319dcddc
JG
376 /*
377 * Send a last buffer statistics sample to the session daemon
378 * to ensure it tracks the amount of data consumed by this channel.
379 */
380 sample_and_send_channel_buffer_stats(channel);
381
fa29bfbf 382 switch (the_consumer_data.type) {
ffe60014
DG
383 case LTTNG_CONSUMER_KERNEL:
384 break;
385 case LTTNG_CONSUMER32_UST:
386 case LTTNG_CONSUMER64_UST:
387 lttng_ustconsumer_del_channel(channel);
388 break;
389 default:
390 ERR("Unknown consumer_data type");
a0377dfe 391 abort();
ffe60014
DG
392 goto end;
393 }
394
d2956687 395 lttng_trace_chunk_put(channel->trace_chunk);
cd9adb8b 396 channel->trace_chunk = nullptr;
5c3892a6 397
d2956687
JG
398 if (channel->is_published) {
399 int ret;
400
56047f5a 401 lttng::urcu::read_lock_guard read_lock;
d2956687 402 iter.iter.node = &channel->node.node;
fa29bfbf 403 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
a0377dfe 404 LTTNG_ASSERT(!ret);
ffe60014 405
d2956687 406 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
28ab034a 407 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
a0377dfe 408 LTTNG_ASSERT(!ret);
d2956687
JG
409 }
410
b6921a17
JG
411 channel->is_deleted = true;
412 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 413end:
a9838785 414 pthread_mutex_unlock(&channel->lock);
fa29bfbf 415 pthread_mutex_unlock(&the_consumer_data.lock);
00e2e675
DG
416}
417
228b5bf7
DG
418/*
419 * Iterate over the relayd hash table and destroy each element. Finally,
420 * destroy the whole hash table.
421 */
cd9adb8b 422static void cleanup_relayd_ht()
228b5bf7
DG
423{
424 struct lttng_ht_iter iter;
425 struct consumer_relayd_sock_pair *relayd;
426
56047f5a
JG
427 {
428 lttng::urcu::read_lock_guard read_lock;
228b5bf7 429
56047f5a
JG
430 cds_lfht_for_each_entry (
431 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
432 consumer_destroy_relayd(relayd);
433 }
228b5bf7
DG
434 }
435
fa29bfbf 436 lttng_ht_destroy(the_consumer_data.relayd_ht);
228b5bf7
DG
437}
438
8994307f
DG
439/*
440 * Update the end point status of all streams having the given network sequence
441 * index (relayd index).
442 *
443 * It's atomically set without having the stream mutex locked which is fine
444 * because we handle the write/read race with a pipe wakeup for each thread.
445 */
da009f2c 446static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
28ab034a 447 enum consumer_endpoint_status status)
8994307f
DG
448{
449 struct lttng_ht_iter iter;
450 struct lttng_consumer_stream *stream;
451
da009f2c 452 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f 453
56047f5a 454 lttng::urcu::read_lock_guard read_lock;
8994307f
DG
455
456 /* Let's begin with metadata */
28ab034a 457 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
458 if (stream->net_seq_idx == net_seq_idx) {
459 uatomic_set(&stream->endpoint_status, status);
32670d71 460 stream->chan->metadata_pushed_wait_queue.wake_all();
f40b76ae 461
8994307f
DG
462 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
463 }
464 }
465
466 /* Follow up by the data streams */
28ab034a 467 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
468 if (stream->net_seq_idx == net_seq_idx) {
469 uatomic_set(&stream->endpoint_status, status);
470 DBG("Delete flag set to data stream %d", stream->wait_fd);
471 }
472 }
8994307f
DG
473}
474
475/*
476 * Cleanup a relayd object by flagging every associated streams for deletion,
477 * destroying the object meaning removing it from the relayd hash table,
478 * closing the sockets and freeing the memory in a RCU call.
479 *
480 * If a local data context is available, notify the threads that the streams'
481 * state have changed.
482 */
9276e5c8 483void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 484{
da009f2c 485 uint64_t netidx;
8994307f 486
a0377dfe 487 LTTNG_ASSERT(relayd);
8994307f 488
97535efa 489 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
9617607b 490
8994307f
DG
491 /* Save the net sequence index before destroying the object */
492 netidx = relayd->net_seq_idx;
493
494 /*
495 * Delete the relayd from the relayd hash table, close the sockets and free
496 * the object in a RCU call.
497 */
51230d70 498 consumer_destroy_relayd(relayd);
8994307f
DG
499
500 /* Set inactive endpoint to all streams */
501 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
502
503 /*
504 * With a local data context, notify the threads that the streams' state
505 * have changed. The write() action on the pipe acts as an "implicit"
506 * memory barrier ordering the updates of the end point status from the
507 * read of this status which happens AFTER receiving this notify.
508 */
9276e5c8
JR
509 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
510 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
511}
512
a6ba4fe1
DG
513/*
514 * Flag a relayd socket pair for destruction. Destroy it if the refcount
515 * reaches zero.
516 *
517 * RCU read side lock MUST be aquired before calling this function.
518 */
519void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
520{
a0377dfe 521 LTTNG_ASSERT(relayd);
48b7cdc2 522 ASSERT_RCU_READ_LOCKED();
a6ba4fe1
DG
523
524 /* Set destroy flag for this object */
525 uatomic_set(&relayd->destroy_flag, 1);
526
527 /* Destroy the relayd if refcount is 0 */
528 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 529 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
530 }
531}
532
3bd1e081 533/*
1d1a276c
DG
534 * Completly destroy stream from every visiable data structure and the given
535 * hash table if one.
536 *
537 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 538 */
28ab034a 539void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
3bd1e081 540{
1d1a276c 541 consumer_stream_destroy(stream, ht);
3bd1e081
MD
542}
543
5ab66908
MD
544/*
545 * XXX naming of del vs destroy is all mixed up.
546 */
547void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
548{
549 consumer_stream_destroy(stream, data_ht);
550}
551
552void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
553{
554 consumer_stream_destroy(stream, metadata_ht);
555}
556
28ab034a
JG
557void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
558 struct lttng_consumer_channel *channel)
d9a2e16e 559{
28ab034a 560 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
d9a2e16e
JD
561}
562
3bd1e081
MD
563/*
564 * Add a stream to the global list protected by a mutex.
565 */
66d583dc 566void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 567{
5ab66908 568 struct lttng_ht *ht = data_ht;
3bd1e081 569
a0377dfe
FD
570 LTTNG_ASSERT(stream);
571 LTTNG_ASSERT(ht);
c77fc10a 572
d88aee68 573 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5 574
fa29bfbf 575 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 576 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 577 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 578 pthread_mutex_lock(&stream->lock);
56047f5a 579 lttng::urcu::read_lock_guard read_lock;
e316aad5 580
43c34bc3 581 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 582 steal_stream_key(stream->key, ht);
43c34bc3 583
d88aee68 584 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 585
28ab034a 586 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 587
ca22feea
DG
588 /*
589 * Add stream to the stream_list_ht of the consumer data. No need to steal
590 * the key since the HT does not use it and we allow to add redundant keys
591 * into this table.
592 */
28ab034a 593 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 594
e316aad5 595 /*
ffe60014
DG
596 * When nb_init_stream_left reaches 0, we don't need to trigger any action
597 * in terms of destroying the associated channel, because the action that
e316aad5
DG
598 * causes the count to become 0 also causes a stream to be added. The
599 * channel deletion will thus be triggered by the following removal of this
600 * stream.
601 */
ffe60014 602 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
603 /* Increment refcount before decrementing nb_init_stream_left */
604 cmm_smp_wmb();
ffe60014 605 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
606 }
607
608 /* Update consumer data once the node is inserted. */
fa29bfbf
SM
609 the_consumer_data.stream_count++;
610 the_consumer_data.need_update = 1;
3bd1e081 611
2e818a6a 612 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 613 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 614 pthread_mutex_unlock(&stream->chan->lock);
fa29bfbf 615 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
616}
617
00e2e675 618/*
3f8e211f
DG
619 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
620 * be acquired before calling this.
00e2e675 621 */
d09e1200 622static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
623{
624 int ret = 0;
d88aee68 625 struct lttng_ht_node_u64 *node;
00e2e675
DG
626 struct lttng_ht_iter iter;
627
a0377dfe 628 LTTNG_ASSERT(relayd);
48b7cdc2 629 ASSERT_RCU_READ_LOCKED();
00e2e675 630
28ab034a 631 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
d88aee68 632 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 633 if (node != nullptr) {
00e2e675
DG
634 goto end;
635 }
fa29bfbf 636 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
00e2e675 637
00e2e675
DG
638end:
639 return ret;
640}
641
642/*
643 * Allocate and return a consumer relayd socket.
644 */
28ab034a 645static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
00e2e675 646{
cd9adb8b 647 struct consumer_relayd_sock_pair *obj = nullptr;
00e2e675 648
da009f2c
MD
649 /* net sequence index of -1 is a failure */
650 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
651 goto error;
652 }
653
64803277 654 obj = zmalloc<consumer_relayd_sock_pair>();
cd9adb8b 655 if (obj == nullptr) {
00e2e675
DG
656 PERROR("zmalloc relayd sock");
657 goto error;
658 }
659
660 obj->net_seq_idx = net_seq_idx;
661 obj->refcount = 0;
173af62f 662 obj->destroy_flag = 0;
f96e4545
MD
663 obj->control_sock.sock.fd = -1;
664 obj->data_sock.sock.fd = -1;
d88aee68 665 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
cd9adb8b 666 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
00e2e675
DG
667
668error:
669 return obj;
670}
671
672/*
673 * Find a relayd socket pair in the global consumer data.
674 *
675 * Return the object if found else NULL.
b0b335c8
MD
676 * RCU read-side lock must be held across this call and while using the
677 * returned object.
00e2e675 678 */
d88aee68 679struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
680{
681 struct lttng_ht_iter iter;
d88aee68 682 struct lttng_ht_node_u64 *node;
cd9adb8b 683 struct consumer_relayd_sock_pair *relayd = nullptr;
00e2e675 684
48b7cdc2
FD
685 ASSERT_RCU_READ_LOCKED();
686
00e2e675 687 /* Negative keys are lookup failures */
d88aee68 688 if (key == (uint64_t) -1ULL) {
00e2e675
DG
689 goto error;
690 }
691
fa29bfbf 692 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
d88aee68 693 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 694 if (node != nullptr) {
0114db0e 695 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675
DG
696 }
697
00e2e675
DG
698error:
699 return relayd;
700}
701
10a50311
JD
702/*
703 * Find a relayd and send the stream
704 *
705 * Returns 0 on success, < 0 on error
706 */
28ab034a 707int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
10a50311
JD
708{
709 int ret = 0;
710 struct consumer_relayd_sock_pair *relayd;
711
a0377dfe
FD
712 LTTNG_ASSERT(stream);
713 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
714 LTTNG_ASSERT(path);
10a50311
JD
715
716 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 717 lttng::urcu::read_lock_guard read_lock;
10a50311 718 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 719 if (relayd != nullptr) {
10a50311
JD
720 /* Add stream on the relayd */
721 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
722 ret = relayd_add_stream(&relayd->control_sock,
723 stream->name,
724 get_consumer_domain(),
725 path,
726 &stream->relayd_stream_id,
727 stream->chan->tracefile_size,
728 stream->chan->tracefile_count,
729 stream->trace_chunk);
10a50311
JD
730 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
731 if (ret < 0) {
28ab034a
JG
732 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
733 relayd->net_seq_idx);
9276e5c8 734 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
735 goto end;
736 }
1c20f0e2 737
10a50311 738 uatomic_inc(&relayd->refcount);
d01178b6 739 stream->sent_to_relayd = 1;
10a50311
JD
740 } else {
741 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
28ab034a
JG
742 stream->key,
743 stream->net_seq_idx);
10a50311
JD
744 ret = -1;
745 goto end;
746 }
747
748 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
28ab034a
JG
749 stream->name,
750 stream->key,
751 stream->net_seq_idx);
10a50311
JD
752
753end:
10a50311
JD
754 return ret;
755}
756
a4baae1b
JD
757/*
758 * Find a relayd and send the streams sent message
759 *
760 * Returns 0 on success, < 0 on error
761 */
762int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
763{
764 int ret = 0;
765 struct consumer_relayd_sock_pair *relayd;
766
a0377dfe 767 LTTNG_ASSERT(net_seq_idx != -1ULL);
a4baae1b
JD
768
769 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 770 lttng::urcu::read_lock_guard read_lock;
a4baae1b 771 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 772 if (relayd != nullptr) {
a4baae1b
JD
773 /* Add stream on the relayd */
774 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
775 ret = relayd_streams_sent(&relayd->control_sock);
776 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
777 if (ret < 0) {
28ab034a
JG
778 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
779 relayd->net_seq_idx);
9276e5c8 780 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
781 goto end;
782 }
783 } else {
28ab034a 784 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
a4baae1b
JD
785 ret = -1;
786 goto end;
787 }
788
789 ret = 0;
790 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
791
792end:
a4baae1b
JD
793 return ret;
794}
795
10a50311
JD
796/*
797 * Find a relayd and close the stream
798 */
799void close_relayd_stream(struct lttng_consumer_stream *stream)
800{
801 struct consumer_relayd_sock_pair *relayd;
802
803 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 804 lttng::urcu::read_lock_guard read_lock;
10a50311
JD
805 relayd = consumer_find_relayd(stream->net_seq_idx);
806 if (relayd) {
807 consumer_stream_relayd_close(stream, relayd);
808 }
10a50311
JD
809}
810
00e2e675
DG
811/*
812 * Handle stream for relayd transmission if the stream applies for network
813 * streaming where the net sequence index is set.
814 *
815 * Return destination file descriptor or negative value on error.
816 */
6197aea7 817static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
28ab034a
JG
818 size_t data_size,
819 unsigned long padding,
820 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
821{
822 int outfd = -1, ret;
00e2e675
DG
823 struct lttcomm_relayd_data_hdr data_hdr;
824
825 /* Safety net */
a0377dfe
FD
826 LTTNG_ASSERT(stream);
827 LTTNG_ASSERT(relayd);
00e2e675
DG
828
829 /* Reset data header */
830 memset(&data_hdr, 0, sizeof(data_hdr));
831
00e2e675
DG
832 if (stream->metadata_flag) {
833 /* Caller MUST acquire the relayd control socket lock */
834 ret = relayd_send_metadata(&relayd->control_sock, data_size);
835 if (ret < 0) {
836 goto error;
837 }
838
839 /* Metadata are always sent on the control socket. */
6151a90f 840 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
841 } else {
842 /* Set header with stream information */
843 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
844 data_hdr.data_size = htobe32(data_size);
1d4dfdef 845 data_hdr.padding_size = htobe32(padding);
c35f9726 846
39df6d9f
DG
847 /*
848 * Note that net_seq_num below is assigned with the *current* value of
849 * next_net_seq_num and only after that the next_net_seq_num will be
850 * increment. This is why when issuing a command on the relayd using
851 * this next value, 1 should always be substracted in order to compare
852 * the last seen sequence number on the relayd side to the last sent.
853 */
3604f373 854 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
855 /* Other fields are zeroed previously */
856
28ab034a 857 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
00e2e675
DG
858 if (ret < 0) {
859 goto error;
860 }
861
3604f373
DG
862 ++stream->next_net_seq_num;
863
00e2e675 864 /* Set to go on data socket */
6151a90f 865 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
866 }
867
868error:
869 return outfd;
870}
871
b1316da1
JG
872/*
873 * Write a character on the metadata poll pipe to wake the metadata thread.
874 * Returns 0 on success, -1 on error.
875 */
876int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
877{
878 int ret = 0;
879
28ab034a 880 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
b1316da1
JG
881 if (channel->monitor && channel->metadata_stream) {
882 const char dummy = 'c';
28ab034a
JG
883 const ssize_t write_ret =
884 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
b1316da1
JG
885
886 if (write_ret < 1) {
887 if (errno == EWOULDBLOCK) {
888 /*
889 * This is fine, the metadata poll thread
890 * is having a hard time keeping-up, but
891 * it will eventually wake-up and consume
892 * the available data.
893 */
894 ret = 0;
895 } else {
896 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
897 ret = -1;
898 goto end;
899 }
900 }
901 }
902
903end:
904 return ret;
905}
906
d2956687
JG
907/*
908 * Trigger a dump of the metadata content. Following/during the succesful
909 * completion of this call, the metadata poll thread will start receiving
910 * metadata packets to consume.
911 *
912 * The caller must hold the channel and stream locks.
913 */
28ab034a 914static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
d2956687
JG
915{
916 int ret;
917
918 ASSERT_LOCKED(stream->chan->lock);
919 ASSERT_LOCKED(stream->lock);
a0377dfe
FD
920 LTTNG_ASSERT(stream->metadata_flag);
921 LTTNG_ASSERT(stream->chan->trace_chunk);
d2956687 922
fa29bfbf 923 switch (the_consumer_data.type) {
d2956687
JG
924 case LTTNG_CONSUMER_KERNEL:
925 /*
926 * Reset the position of what has been read from the
927 * metadata cache to 0 so we can dump it again.
928 */
929 ret = kernctl_metadata_cache_dump(stream->wait_fd);
930 break;
931 case LTTNG_CONSUMER32_UST:
932 case LTTNG_CONSUMER64_UST:
933 /*
934 * Reset the position pushed from the metadata cache so it
935 * will write from the beginning on the next push.
936 */
937 stream->ust_metadata_pushed = 0;
938 ret = consumer_metadata_wakeup_pipe(stream->chan);
939 break;
940 default:
941 ERR("Unknown consumer_data type");
942 abort();
943 }
944 if (ret < 0) {
945 ERR("Failed to dump the metadata cache");
946 }
947 return ret;
948}
949
28ab034a
JG
950static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
951 struct lttng_trace_chunk *new_trace_chunk)
d2956687 952{
d2956687 953 pthread_mutex_lock(&channel->lock);
b6921a17
JG
954 if (channel->is_deleted) {
955 /*
956 * The channel has been logically deleted and should no longer
957 * be used. It has released its reference to its current trace
958 * chunk and should not acquire a new one.
959 *
960 * Return success as there is nothing for the caller to do.
961 */
962 goto end;
963 }
d2956687
JG
964
965 /*
966 * The acquisition of the reference cannot fail (barring
967 * a severe internal error) since a reference to the published
968 * chunk is already held by the caller.
969 */
970 if (new_trace_chunk) {
28ab034a 971 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
d2956687 972
a0377dfe 973 LTTNG_ASSERT(acquired_reference);
d2956687
JG
974 }
975
976 lttng_trace_chunk_put(channel->trace_chunk);
977 channel->trace_chunk = new_trace_chunk;
d2956687
JG
978end:
979 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 980 return 0;
d2956687
JG
981}
982
3bd1e081 983/*
ffe60014
DG
984 * Allocate and return a new lttng_consumer_channel object using the given key
985 * to initialize the hash table node.
986 *
987 * On error, return NULL.
3bd1e081 988 */
886224ff 989struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
28ab034a
JG
990 uint64_t session_id,
991 const uint64_t *chunk_id,
992 const char *pathname,
993 const char *name,
994 uint64_t relayd_id,
995 enum lttng_event_output output,
996 uint64_t tracefile_size,
997 uint64_t tracefile_count,
998 uint64_t session_id_per_pid,
999 unsigned int monitor,
1000 unsigned int live_timer_interval,
1001 bool is_in_live_session,
1002 const char *root_shm_path,
1003 const char *shm_path)
3bd1e081 1004{
cd9adb8b
JG
1005 struct lttng_consumer_channel *channel = nullptr;
1006 struct lttng_trace_chunk *trace_chunk = nullptr;
d2956687
JG
1007
1008 if (chunk_id) {
1009 trace_chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 1010 the_consumer_data.chunk_registry, session_id, *chunk_id);
d2956687
JG
1011 if (!trace_chunk) {
1012 ERR("Failed to find trace chunk reference during creation of channel");
1013 goto end;
1014 }
1015 }
3bd1e081 1016
32670d71
JG
1017 try {
1018 channel = new lttng_consumer_channel;
1019 } catch (const std::bad_alloc& e) {
1020 ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
1021 channel = nullptr;
3bd1e081
MD
1022 goto end;
1023 }
ffe60014
DG
1024
1025 channel->key = key;
3bd1e081 1026 channel->refcount = 0;
ffe60014 1027 channel->session_id = session_id;
1950109e 1028 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1029 channel->relayd_id = relayd_id;
1624d5b7
JD
1030 channel->tracefile_size = tracefile_size;
1031 channel->tracefile_count = tracefile_count;
2bba9e53 1032 channel->monitor = monitor;
ecc48a90 1033 channel->live_timer_interval = live_timer_interval;
a2814ea7 1034 channel->is_live = is_in_live_session;
f40b76ae
JG
1035 pthread_mutex_init(&channel->lock, NULL);
1036 pthread_mutex_init(&channel->timer_lock, NULL);
ffe60014 1037
0c759fc9
DG
1038 switch (output) {
1039 case LTTNG_EVENT_SPLICE:
1040 channel->output = CONSUMER_CHANNEL_SPLICE;
1041 break;
1042 case LTTNG_EVENT_MMAP:
1043 channel->output = CONSUMER_CHANNEL_MMAP;
1044 break;
1045 default:
a0377dfe 1046 abort();
32670d71 1047 delete channel;
cd9adb8b 1048 channel = nullptr;
0c759fc9
DG
1049 goto end;
1050 }
1051
07b86b52
JD
1052 /*
1053 * In monitor mode, the streams associated with the channel will be put in
1054 * a special list ONLY owned by this channel. So, the refcount is set to 1
1055 * here meaning that the channel itself has streams that are referenced.
1056 *
1057 * On a channel deletion, once the channel is no longer visible, the
1058 * refcount is decremented and checked for a zero value to delete it. With
1059 * streams in no monitor mode, it will now be safe to destroy the channel.
1060 */
1061 if (!channel->monitor) {
1062 channel->refcount = 1;
1063 }
1064
ffe60014
DG
1065 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1066 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1067
1068 strncpy(channel->name, name, sizeof(channel->name));
1069 channel->name[sizeof(channel->name) - 1] = '\0';
1070
3d071855
MD
1071 if (root_shm_path) {
1072 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1073 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1074 }
d7ba1388
MD
1075 if (shm_path) {
1076 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1077 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1078 }
1079
d88aee68 1080 lttng_ht_node_init_u64(&channel->node, channel->key);
28ab034a 1081 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
d8ef542d
MD
1082
1083 channel->wait_fd = -1;
ffe60014
DG
1084 CDS_INIT_LIST_HEAD(&channel->streams.head);
1085
d2956687 1086 if (trace_chunk) {
28ab034a 1087 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
d2956687
JG
1088 if (ret) {
1089 goto error;
1090 }
1091 }
1092
62a7b8ed 1093 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1094
3bd1e081 1095end:
d2956687 1096 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1097 return channel;
d2956687
JG
1098error:
1099 consumer_del_channel(channel);
cd9adb8b 1100 channel = nullptr;
d2956687 1101 goto end;
3bd1e081
MD
1102}
1103
1104/*
1105 * Add a channel to the global list protected by a mutex.
821fffb2 1106 *
b5a6470f 1107 * Always return 0 indicating success.
3bd1e081 1108 */
d8ef542d 1109int consumer_add_channel(struct lttng_consumer_channel *channel,
28ab034a 1110 struct lttng_consumer_local_data *ctx)
3bd1e081 1111{
fa29bfbf 1112 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 1113 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1114 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1115
b5a6470f
DG
1116 /*
1117 * This gives us a guarantee that the channel we are about to add to the
1118 * channel hash table will be unique. See this function comment on the why
1119 * we need to steel the channel key at this stage.
1120 */
1121 steal_channel_key(channel->key);
c77fc10a 1122
56047f5a 1123 lttng::urcu::read_lock_guard read_lock;
fa29bfbf
SM
1124 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1125 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
28ab034a 1126 &channel->channels_by_session_id_ht_node);
d2956687 1127 channel->is_published = true;
b5a6470f 1128
ec6ea7d0 1129 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1130 pthread_mutex_unlock(&channel->lock);
fa29bfbf 1131 pthread_mutex_unlock(&the_consumer_data.lock);
702b1ea4 1132
b5a6470f 1133 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1134 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1135 }
b5a6470f
DG
1136
1137 return 0;
3bd1e081
MD
1138}
1139
1140/*
1141 * Allocate the pollfd structure and the local view of the out fds to avoid
1142 * doing a lookup in the linked list and concurrency issues when writing is
1143 * needed. Called with consumer_data.lock held.
1144 *
1145 * Returns the number of fds in the structures.
1146 */
ffe60014 1147static int update_poll_array(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1148 struct pollfd **pollfd,
1149 struct lttng_consumer_stream **local_stream,
1150 struct lttng_ht *ht,
1151 int *nb_inactive_fd)
3bd1e081 1152{
3bd1e081 1153 int i = 0;
e4421fec
DG
1154 struct lttng_ht_iter iter;
1155 struct lttng_consumer_stream *stream;
3bd1e081 1156
a0377dfe
FD
1157 LTTNG_ASSERT(ctx);
1158 LTTNG_ASSERT(ht);
1159 LTTNG_ASSERT(pollfd);
1160 LTTNG_ASSERT(local_stream);
ffe60014 1161
3bd1e081 1162 DBG("Updating poll fd array");
9a2fcf78 1163 *nb_inactive_fd = 0;
56047f5a
JG
1164
1165 {
1166 lttng::urcu::read_lock_guard read_lock;
1167 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1168 /*
1169 * Only active streams with an active end point can be added to the
1170 * poll set and local stream storage of the thread.
1171 *
1172 * There is a potential race here for endpoint_status to be updated
1173 * just after the check. However, this is OK since the stream(s) will
1174 * be deleted once the thread is notified that the end point state has
1175 * changed where this function will be called back again.
1176 *
1177 * We track the number of inactive FDs because they still need to be
1178 * closed by the polling thread after a wakeup on the data_pipe or
1179 * metadata_pipe.
1180 */
1181 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1182 (*nb_inactive_fd)++;
1183 continue;
1184 }
1185
1186 (*pollfd)[i].fd = stream->wait_fd;
1187 (*pollfd)[i].events = POLLIN | POLLPRI;
1188 local_stream[i] = stream;
1189 i++;
3bd1e081 1190 }
3bd1e081
MD
1191 }
1192
1193 /*
50f8ae69 1194 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1195 * increment i so nb_fd is the number of real FD.
1196 */
acdb9057 1197 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1198 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1199
1200 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1201 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1202 return i;
1203}
1204
1205/*
84382d49
MD
1206 * Poll on the should_quit pipe and the command socket return -1 on
1207 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1208 */
1209int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1210{
1211 int num_rdy;
1212
88f2b785 1213restart:
3bd1e081
MD
1214 num_rdy = poll(consumer_sockpoll, 2, -1);
1215 if (num_rdy == -1) {
88f2b785
MD
1216 /*
1217 * Restart interrupted system call.
1218 */
1219 if (errno == EINTR) {
1220 goto restart;
1221 }
7a57cf92 1222 PERROR("Poll error");
84382d49 1223 return -1;
3bd1e081 1224 }
509bb1cf 1225 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1226 DBG("consumer_should_quit wake up");
84382d49 1227 return 1;
3bd1e081
MD
1228 }
1229 return 0;
3bd1e081
MD
1230}
1231
1232/*
1233 * Set the error socket.
1234 */
28ab034a 1235void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
3bd1e081
MD
1236{
1237 ctx->consumer_error_socket = sock;
1238}
1239
1240/*
1241 * Set the command socket path.
1242 */
28ab034a 1243void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
3bd1e081
MD
1244{
1245 ctx->consumer_command_sock_path = sock;
1246}
1247
1248/*
1249 * Send return code to the session daemon.
1250 * If the socket is not defined, we return 0, it is not a fatal error
1251 */
fbd566c2
JG
1252int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
1253 enum lttcomm_return_code error_code)
3bd1e081
MD
1254{
1255 if (ctx->consumer_error_socket > 0) {
fbd566c2
JG
1256 const std::int32_t comm_code = std::int32_t(error_code);
1257
1258 static_assert(
1259 sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
1260 "Fixed-size communication type too small to accomodate lttcomm_return_code");
28ab034a 1261 return lttcomm_send_unix_sock(
fbd566c2 1262 ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
3bd1e081
MD
1263 }
1264
1265 return 0;
1266}
1267
1268/*
228b5bf7
DG
1269 * Close all the tracefiles and stream fds and MUST be called when all
1270 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081 1271 */
cd9adb8b 1272void lttng_consumer_cleanup()
3bd1e081 1273{
e4421fec 1274 struct lttng_ht_iter iter;
ffe60014 1275 struct lttng_consumer_channel *channel;
e10aec8f 1276 unsigned int trace_chunks_left;
6065ceec 1277
56047f5a
JG
1278 {
1279 lttng::urcu::read_lock_guard read_lock;
3bd1e081 1280
56047f5a
JG
1281 cds_lfht_for_each_entry (
1282 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1283 consumer_del_channel(channel);
1284 }
3bd1e081 1285 }
6065ceec 1286
fa29bfbf
SM
1287 lttng_ht_destroy(the_consumer_data.channel_ht);
1288 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1289
1290 cleanup_relayd_ht();
1291
fa29bfbf 1292 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
d8ef542d 1293
228b5bf7
DG
1294 /*
1295 * This HT contains streams that are freed by either the metadata thread or
1296 * the data thread so we do *nothing* on the hash table and simply destroy
1297 * it.
1298 */
fa29bfbf 1299 lttng_ht_destroy(the_consumer_data.stream_list_ht);
28cc88f3 1300
e10aec8f
MD
1301 /*
1302 * Trace chunks in the registry may still exist if the session
1303 * daemon has encountered an internal error and could not
1304 * tear down its sessions and/or trace chunks properly.
1305 *
1306 * Release the session daemon's implicit reference to any remaining
1307 * trace chunk and print an error if any trace chunk was found. Note
1308 * that there are _no_ legitimate cases for trace chunks to be left,
1309 * it is a leak. However, it can happen following a crash of the
1310 * session daemon and not emptying the registry would cause an assertion
1311 * to hit.
1312 */
28ab034a
JG
1313 trace_chunks_left =
1314 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
e10aec8f
MD
1315 if (trace_chunks_left) {
1316 ERR("%u trace chunks are leaked by lttng-consumerd. "
28ab034a
JG
1317 "This can be caused by an internal error of the session daemon.",
1318 trace_chunks_left);
e10aec8f
MD
1319 }
1320 /* Run all callbacks freeing each chunk. */
1321 rcu_barrier();
fa29bfbf 1322 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
3bd1e081
MD
1323}
1324
1325/*
1326 * Called from signal handler.
1327 */
1328void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1329{
6cd525e8
MD
1330 ssize_t ret;
1331
10211f5c 1332 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1333 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1334 if (ret < 1) {
7a57cf92 1335 PERROR("write consumer quit");
3bd1e081 1336 }
ab1027f4
DG
1337
1338 DBG("Consumer flag that it should quit");
3bd1e081
MD
1339}
1340
5199ffc4
JG
1341/*
1342 * Flush pending writes to trace output disk file.
1343 */
28ab034a 1344static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
3bd1e081
MD
1345{
1346 int outfd = stream->out_fd;
1347
1348 /*
1349 * This does a blocking write-and-wait on any page that belongs to the
1350 * subbuffer prior to the one we just wrote.
1351 * Don't care about error values, as these are just hints and ways to
1352 * limit the amount of page cache used.
1353 */
ffe60014 1354 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1355 return;
1356 }
671e39d7
MJ
1357 lttng::io::hint_flush_range_dont_need_sync(
1358 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
3bd1e081
MD
1359}
1360
1361/*
1362 * Initialise the necessary environnement :
1363 * - create a new context
1364 * - create the poll_pipe
1365 * - create the should_quit pipe (for signal handler)
1366 * - create the thread pipe (for splice)
1367 *
1368 * Takes a function pointer as argument, this function is called when data is
1369 * available on a buffer. This function is responsible to do the
1370 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1371 * buffer configuration and then kernctl_put_next_subbuf at the end.
1372 *
1373 * Returns a pointer to the new context or NULL on error.
1374 */
28ab034a
JG
1375struct lttng_consumer_local_data *
1376lttng_consumer_create(enum lttng_consumer_type type,
1377 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1378 struct lttng_consumer_local_data *ctx,
1379 bool locked_by_caller),
1380 int (*recv_channel)(struct lttng_consumer_channel *channel),
1381 int (*recv_stream)(struct lttng_consumer_stream *stream),
1382 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1383{
d8ef542d 1384 int ret;
3bd1e081
MD
1385 struct lttng_consumer_local_data *ctx;
1386
a0377dfe 1387 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
28ab034a 1388 the_consumer_data.type == type);
fa29bfbf 1389 the_consumer_data.type = type;
3bd1e081 1390
64803277 1391 ctx = zmalloc<lttng_consumer_local_data>();
cd9adb8b 1392 if (ctx == nullptr) {
7a57cf92 1393 PERROR("allocating context");
3bd1e081
MD
1394 goto error;
1395 }
1396
1397 ctx->consumer_error_socket = -1;
331744e3 1398 ctx->consumer_metadata_socket = -1;
cd9adb8b 1399 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
3bd1e081
MD
1400 /* assign the callbacks */
1401 ctx->on_buffer_ready = buffer_ready;
1402 ctx->on_recv_channel = recv_channel;
1403 ctx->on_recv_stream = recv_stream;
1404 ctx->on_update_stream = update_stream;
1405
acdb9057
DG
1406 ctx->consumer_data_pipe = lttng_pipe_open(0);
1407 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1408 goto error_poll_pipe;
1409 }
1410
02b3d176
DG
1411 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1412 if (!ctx->consumer_wakeup_pipe) {
1413 goto error_wakeup_pipe;
1414 }
1415
3bd1e081
MD
1416 ret = pipe(ctx->consumer_should_quit);
1417 if (ret < 0) {
7a57cf92 1418 PERROR("Error creating recv pipe");
3bd1e081
MD
1419 goto error_quit_pipe;
1420 }
1421
d8ef542d
MD
1422 ret = pipe(ctx->consumer_channel_pipe);
1423 if (ret < 0) {
1424 PERROR("Error creating channel pipe");
1425 goto error_channel_pipe;
1426 }
1427
13886d2d
DG
1428 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1429 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1430 goto error_metadata_pipe;
1431 }
3bd1e081 1432
e9404c27
JG
1433 ctx->channel_monitor_pipe = -1;
1434
fb3a43a9 1435 return ctx;
3bd1e081 1436
fb3a43a9 1437error_metadata_pipe:
d8ef542d
MD
1438 utils_close_pipe(ctx->consumer_channel_pipe);
1439error_channel_pipe:
d8ef542d 1440 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1441error_quit_pipe:
02b3d176
DG
1442 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1443error_wakeup_pipe:
acdb9057 1444 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1445error_poll_pipe:
1446 free(ctx);
1447error:
cd9adb8b 1448 return nullptr;
3bd1e081
MD
1449}
1450
282dadbc
MD
1451/*
1452 * Iterate over all streams of the hashtable and free them properly.
1453 */
1454static void destroy_data_stream_ht(struct lttng_ht *ht)
1455{
1456 struct lttng_ht_iter iter;
1457 struct lttng_consumer_stream *stream;
1458
cd9adb8b 1459 if (ht == nullptr) {
282dadbc
MD
1460 return;
1461 }
1462
56047f5a
JG
1463 {
1464 lttng::urcu::read_lock_guard read_lock;
1465 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1466 /*
1467 * Ignore return value since we are currently cleaning up so any error
1468 * can't be handled.
1469 */
1470 (void) consumer_del_stream(stream, ht);
1471 }
282dadbc 1472 }
282dadbc
MD
1473
1474 lttng_ht_destroy(ht);
1475}
1476
1477/*
1478 * Iterate over all streams of the metadata hashtable and free them
1479 * properly.
1480 */
1481static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1482{
1483 struct lttng_ht_iter iter;
1484 struct lttng_consumer_stream *stream;
1485
cd9adb8b 1486 if (ht == nullptr) {
282dadbc
MD
1487 return;
1488 }
1489
56047f5a
JG
1490 {
1491 lttng::urcu::read_lock_guard read_lock;
1492 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1493 /*
1494 * Ignore return value since we are currently cleaning up so any error
1495 * can't be handled.
1496 */
1497 (void) consumer_del_metadata_stream(stream, ht);
1498 }
282dadbc 1499 }
282dadbc
MD
1500
1501 lttng_ht_destroy(ht);
1502}
1503
3bd1e081
MD
1504/*
1505 * Close all fds associated with the instance and free the context.
1506 */
1507void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1508{
4c462e79
MD
1509 int ret;
1510
ab1027f4
DG
1511 DBG("Consumer destroying it. Closing everything.");
1512
4f2e75b9
DG
1513 if (!ctx) {
1514 return;
1515 }
1516
282dadbc
MD
1517 destroy_data_stream_ht(data_ht);
1518 destroy_metadata_stream_ht(metadata_ht);
1519
4c462e79
MD
1520 ret = close(ctx->consumer_error_socket);
1521 if (ret) {
1522 PERROR("close");
1523 }
331744e3
JD
1524 ret = close(ctx->consumer_metadata_socket);
1525 if (ret) {
1526 PERROR("close");
1527 }
d8ef542d 1528 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1529 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1530 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1531 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1532 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1533
3bd1e081
MD
1534 unlink(ctx->consumer_command_sock_path);
1535 free(ctx);
1536}
1537
6197aea7
DG
1538/*
1539 * Write the metadata stream id on the specified file descriptor.
1540 */
28ab034a
JG
1541static int
1542write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
6197aea7 1543{
6cd525e8 1544 ssize_t ret;
1d4dfdef 1545 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1546
1d4dfdef
DG
1547 hdr.stream_id = htobe64(stream->relayd_stream_id);
1548 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1549 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1550 if (ret < sizeof(hdr)) {
d7b75ec8 1551 /*
6f04ed72 1552 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1553 * not to clubber the error output since this can happen in a normal
1554 * code path.
1555 */
1556 if (errno != EPIPE) {
1557 PERROR("write metadata stream id");
1558 }
1559 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1560 /*
1561 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1562 * handle writting the missing part so report that as an error and
1563 * don't lie to the caller.
1564 */
1565 ret = -1;
6197aea7
DG
1566 goto end;
1567 }
1d4dfdef 1568 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
28ab034a
JG
1569 stream->relayd_stream_id,
1570 padding);
6197aea7
DG
1571
1572end:
6cd525e8 1573 return (int) ret;
6197aea7
DG
1574}
1575
3bd1e081 1576/*
09e26845
DG
1577 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1578 * core function for writing trace buffers to either the local filesystem or
1579 * the network.
1580 *
d2956687 1581 * It must be called with the stream and the channel lock held.
79d4ffb7 1582 *
09e26845 1583 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1584 *
1585 * Returns the number of bytes written
1586 */
28ab034a
JG
1587ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1588 const struct lttng_buffer_view *buffer,
1589 unsigned long padding)
3bd1e081 1590{
994ab360 1591 ssize_t ret = 0;
f02e1e8a
DG
1592 off_t orig_offset = stream->out_fd_offset;
1593 /* Default is on the disk */
1594 int outfd = stream->out_fd;
cd9adb8b 1595 struct consumer_relayd_sock_pair *relayd = nullptr;
8994307f 1596 unsigned int relayd_hang_up = 0;
fd424d99
JG
1597 const size_t subbuf_content_size = buffer->size - padding;
1598 size_t write_len;
f02e1e8a
DG
1599
1600 /* RCU lock for the relayd pointer */
56047f5a 1601 lttng::urcu::read_lock_guard read_lock;
28ab034a 1602 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
d2956687 1603
f02e1e8a 1604 /* Flag that the current stream if set for network streaming. */
da009f2c 1605 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1606 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1607 if (relayd == nullptr) {
56591bac 1608 ret = -EPIPE;
f02e1e8a
DG
1609 goto end;
1610 }
1611 }
1612
f02e1e8a
DG
1613 /* Handle stream on the relayd if the output is on the network */
1614 if (relayd) {
fd424d99 1615 unsigned long netlen = subbuf_content_size;
f02e1e8a
DG
1616
1617 /*
1618 * Lock the control socket for the complete duration of the function
1619 * since from this point on we will use the socket.
1620 */
1621 if (stream->metadata_flag) {
1622 /* Metadata requires the control socket. */
1623 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1624 if (stream->reset_metadata_flag) {
1625 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1626 stream->relayd_stream_id,
1627 stream->metadata_version);
93ec662e
JD
1628 if (ret < 0) {
1629 relayd_hang_up = 1;
1630 goto write_error;
1631 }
1632 stream->reset_metadata_flag = 0;
1633 }
1d4dfdef 1634 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1635 }
1636
1d4dfdef 1637 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1638 if (ret < 0) {
1639 relayd_hang_up = 1;
1640 goto write_error;
1641 }
1642 /* Use the returned socket. */
1643 outfd = ret;
f02e1e8a 1644
994ab360
DG
1645 /* Write metadata stream id before payload */
1646 if (stream->metadata_flag) {
239f61af 1647 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1648 if (ret < 0) {
8994307f
DG
1649 relayd_hang_up = 1;
1650 goto write_error;
1651 }
f02e1e8a 1652 }
1624d5b7 1653
fd424d99
JG
1654 write_len = subbuf_content_size;
1655 } else {
1656 /* No streaming; we have to write the full padding. */
93ec662e
JD
1657 if (stream->metadata_flag && stream->reset_metadata_flag) {
1658 ret = utils_truncate_stream_file(stream->out_fd, 0);
1659 if (ret < 0) {
1660 ERR("Reset metadata file");
1661 goto end;
1662 }
1663 stream->reset_metadata_flag = 0;
1664 }
1665
1624d5b7
JD
1666 /*
1667 * Check if we need to change the tracefile before writing the packet.
1668 */
1669 if (stream->chan->tracefile_size > 0 &&
28ab034a
JG
1670 (stream->tracefile_size_current + buffer->size) >
1671 stream->chan->tracefile_size) {
d2956687
JG
1672 ret = consumer_stream_rotate_output_files(stream);
1673 if (ret) {
1624d5b7
JD
1674 goto end;
1675 }
309167d2 1676 outfd = stream->out_fd;
a1ae300f 1677 orig_offset = 0;
1624d5b7 1678 }
fd424d99 1679 stream->tracefile_size_current += buffer->size;
fd424d99 1680 write_len = buffer->size;
f02e1e8a
DG
1681 }
1682
d02b8372
DG
1683 /*
1684 * This call guarantee that len or less is returned. It's impossible to
1685 * receive a ret value that is bigger than len.
1686 */
fd424d99 1687 ret = lttng_write(outfd, buffer->data, write_len);
e2d1190b 1688 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
fd424d99 1689 if (ret < 0 || ((size_t) ret != write_len)) {
d02b8372
DG
1690 /*
1691 * Report error to caller if nothing was written else at least send the
1692 * amount written.
1693 */
1694 if (ret < 0) {
994ab360 1695 ret = -errno;
f02e1e8a 1696 }
994ab360 1697 relayd_hang_up = 1;
f02e1e8a 1698
d02b8372 1699 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1700 if (errno == EPIPE) {
d02b8372
DG
1701 /*
1702 * This is possible if the fd is closed on the other side
1703 * (outfd) or any write problem. It can be verbose a bit for a
1704 * normal execution if for instance the relayd is stopped
1705 * abruptly. This can happen so set this to a DBG statement.
1706 */
1707 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1708 } else {
1709 /* Unhandled error, print it and stop function right now. */
28ab034a 1710 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
f02e1e8a 1711 }
994ab360 1712 goto write_error;
d02b8372
DG
1713 }
1714 stream->output_written += ret;
d02b8372
DG
1715
1716 /* This call is useless on a socket so better save a syscall. */
1717 if (!relayd) {
1718 /* This won't block, but will start writeout asynchronously */
671e39d7 1719 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
fd424d99 1720 stream->out_fd_offset += write_len;
f5dbe415 1721 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1722 }
f02e1e8a 1723
8994307f
DG
1724write_error:
1725 /*
1726 * This is a special case that the relayd has closed its socket. Let's
1727 * cleanup the relayd object and all associated streams.
1728 */
1729 if (relayd && relayd_hang_up) {
28ab034a 1730 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1731 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1732 }
1733
f02e1e8a
DG
1734end:
1735 /* Unlock only if ctrl socket used */
1736 if (relayd && stream->metadata_flag) {
1737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1738 }
1739
994ab360 1740 return ret;
3bd1e081
MD
1741}
1742
1743/*
1744 * Splice the data from the ring buffer to the tracefile.
1745 *
79d4ffb7
DG
1746 * It must be called with the stream lock held.
1747 *
3bd1e081
MD
1748 * Returns the number of bytes spliced.
1749 */
28ab034a
JG
1750ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1751 struct lttng_consumer_stream *stream,
1752 unsigned long len,
1753 unsigned long padding)
3bd1e081 1754{
f02e1e8a
DG
1755 ssize_t ret = 0, written = 0, ret_splice = 0;
1756 loff_t offset = 0;
1757 off_t orig_offset = stream->out_fd_offset;
1758 int fd = stream->wait_fd;
1759 /* Default is on the disk */
1760 int outfd = stream->out_fd;
cd9adb8b 1761 struct consumer_relayd_sock_pair *relayd = nullptr;
fb3a43a9 1762 int *splice_pipe;
8994307f 1763 unsigned int relayd_hang_up = 0;
f02e1e8a 1764
fa29bfbf 1765 switch (the_consumer_data.type) {
3bd1e081 1766 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1767 break;
7753dea8
MD
1768 case LTTNG_CONSUMER32_UST:
1769 case LTTNG_CONSUMER64_UST:
f02e1e8a 1770 /* Not supported for user space tracing */
3bd1e081
MD
1771 return -ENOSYS;
1772 default:
1773 ERR("Unknown consumer_data type");
a0377dfe 1774 abort();
3bd1e081
MD
1775 }
1776
f02e1e8a 1777 /* RCU lock for the relayd pointer */
56047f5a 1778 lttng::urcu::read_lock_guard read_lock;
f02e1e8a
DG
1779
1780 /* Flag that the current stream if set for network streaming. */
da009f2c 1781 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1782 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1783 if (relayd == nullptr) {
ad0b0d23 1784 written = -ret;
f02e1e8a
DG
1785 goto end;
1786 }
1787 }
a2361a61 1788 splice_pipe = stream->splice_pipe;
fb3a43a9 1789
f02e1e8a 1790 /* Write metadata stream id before payload */
1d4dfdef 1791 if (relayd) {
ad0b0d23 1792 unsigned long total_len = len;
f02e1e8a 1793
1d4dfdef
DG
1794 if (stream->metadata_flag) {
1795 /*
1796 * Lock the control socket for the complete duration of the function
1797 * since from this point on we will use the socket.
1798 */
1799 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1800
93ec662e
JD
1801 if (stream->reset_metadata_flag) {
1802 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1803 stream->relayd_stream_id,
1804 stream->metadata_version);
93ec662e
JD
1805 if (ret < 0) {
1806 relayd_hang_up = 1;
1807 goto write_error;
1808 }
1809 stream->reset_metadata_flag = 0;
1810 }
28ab034a 1811 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1d4dfdef
DG
1812 if (ret < 0) {
1813 written = ret;
ad0b0d23
DG
1814 relayd_hang_up = 1;
1815 goto write_error;
1d4dfdef
DG
1816 }
1817
1818 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1819 }
1820
1821 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1822 if (ret < 0) {
1823 written = ret;
1824 relayd_hang_up = 1;
1825 goto write_error;
f02e1e8a 1826 }
ad0b0d23
DG
1827 /* Use the returned socket. */
1828 outfd = ret;
1d4dfdef
DG
1829 } else {
1830 /* No streaming, we have to set the len with the full padding */
1831 len += padding;
1624d5b7 1832
93ec662e
JD
1833 if (stream->metadata_flag && stream->reset_metadata_flag) {
1834 ret = utils_truncate_stream_file(stream->out_fd, 0);
1835 if (ret < 0) {
1836 ERR("Reset metadata file");
1837 goto end;
1838 }
1839 stream->reset_metadata_flag = 0;
1840 }
1624d5b7
JD
1841 /*
1842 * Check if we need to change the tracefile before writing the packet.
1843 */
1844 if (stream->chan->tracefile_size > 0 &&
28ab034a 1845 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
d2956687 1846 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1847 if (ret < 0) {
ad0b0d23 1848 written = ret;
1624d5b7
JD
1849 goto end;
1850 }
309167d2 1851 outfd = stream->out_fd;
a1ae300f 1852 orig_offset = 0;
1624d5b7
JD
1853 }
1854 stream->tracefile_size_current += len;
f02e1e8a
DG
1855 }
1856
1857 while (len > 0) {
1d4dfdef 1858 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
28ab034a
JG
1859 (unsigned long) offset,
1860 len,
1861 fd,
1862 splice_pipe[1]);
1863 ret_splice = splice(
cd9adb8b 1864 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
f02e1e8a
DG
1865 DBG("splice chan to pipe, ret %zd", ret_splice);
1866 if (ret_splice < 0) {
d02b8372 1867 ret = errno;
ad0b0d23 1868 written = -ret;
d02b8372 1869 PERROR("Error in relay splice");
f02e1e8a
DG
1870 goto splice_error;
1871 }
1872
1873 /* Handle stream on the relayd if the output is on the network */
ad0b0d23
DG
1874 if (relayd && stream->metadata_flag) {
1875 size_t metadata_payload_size =
1876 sizeof(struct lttcomm_relayd_metadata_payload);
1877
1878 /* Update counter to fit the spliced data */
1879 ret_splice += metadata_payload_size;
1880 len += metadata_payload_size;
1881 /*
1882 * We do this so the return value can match the len passed as
1883 * argument to this function.
1884 */
1885 written -= metadata_payload_size;
f02e1e8a
DG
1886 }
1887
1888 /* Splice data out */
28ab034a 1889 ret_splice = splice(splice_pipe[0],
cd9adb8b 1890 nullptr,
28ab034a 1891 outfd,
cd9adb8b 1892 nullptr,
28ab034a
JG
1893 ret_splice,
1894 SPLICE_F_MOVE | SPLICE_F_MORE);
1895 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
f02e1e8a 1896 if (ret_splice < 0) {
d02b8372 1897 ret = errno;
ad0b0d23
DG
1898 written = -ret;
1899 relayd_hang_up = 1;
1900 goto write_error;
f02e1e8a 1901 } else if (ret_splice > len) {
d02b8372
DG
1902 /*
1903 * We don't expect this code path to be executed but you never know
1904 * so this is an extra protection agains a buggy splice().
1905 */
f02e1e8a 1906 ret = errno;
ad0b0d23 1907 written += ret_splice;
28ab034a 1908 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
f02e1e8a 1909 goto splice_error;
d02b8372
DG
1910 } else {
1911 /* All good, update current len and continue. */
1912 len -= ret_splice;
f02e1e8a 1913 }
f02e1e8a
DG
1914
1915 /* This call is useless on a socket so better save a syscall. */
1916 if (!relayd) {
1917 /* This won't block, but will start writeout asynchronously */
671e39d7 1918 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
f02e1e8a
DG
1919 stream->out_fd_offset += ret_splice;
1920 }
e5d1a9b3 1921 stream->output_written += ret_splice;
f02e1e8a
DG
1922 written += ret_splice;
1923 }
f5dbe415
JG
1924 if (!relayd) {
1925 lttng_consumer_sync_trace_file(stream, orig_offset);
1926 }
f02e1e8a
DG
1927 goto end;
1928
8994307f
DG
1929write_error:
1930 /*
1931 * This is a special case that the relayd has closed its socket. Let's
1932 * cleanup the relayd object and all associated streams.
1933 */
1934 if (relayd && relayd_hang_up) {
28ab034a 1935 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1936 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1937 /* Skip splice error so the consumer does not fail */
1938 goto end;
1939 }
1940
f02e1e8a
DG
1941splice_error:
1942 /* send the appropriate error description to sessiond */
1943 switch (ret) {
f02e1e8a 1944 case EINVAL:
f73fabfd 1945 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1946 break;
1947 case ENOMEM:
f73fabfd 1948 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1949 break;
1950 case ESPIPE:
f73fabfd 1951 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1952 break;
1953 }
1954
1955end:
1956 if (relayd && stream->metadata_flag) {
1957 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1958 }
1959
f02e1e8a 1960 return written;
3bd1e081
MD
1961}
1962
15055ce5
JD
1963/*
1964 * Sample the snapshot positions for a specific fd
1965 *
1966 * Returns 0 on success, < 0 on error
1967 */
1968int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1969{
fa29bfbf 1970 switch (the_consumer_data.type) {
15055ce5
JD
1971 case LTTNG_CONSUMER_KERNEL:
1972 return lttng_kconsumer_sample_snapshot_positions(stream);
1973 case LTTNG_CONSUMER32_UST:
1974 case LTTNG_CONSUMER64_UST:
1975 return lttng_ustconsumer_sample_snapshot_positions(stream);
1976 default:
1977 ERR("Unknown consumer_data type");
a0377dfe 1978 abort();
15055ce5
JD
1979 return -ENOSYS;
1980 }
1981}
3bd1e081
MD
1982/*
1983 * Take a snapshot for a specific fd
1984 *
1985 * Returns 0 on success, < 0 on error
1986 */
ffe60014 1987int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1988{
fa29bfbf 1989 switch (the_consumer_data.type) {
3bd1e081 1990 case LTTNG_CONSUMER_KERNEL:
ffe60014 1991 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1992 case LTTNG_CONSUMER32_UST:
1993 case LTTNG_CONSUMER64_UST:
ffe60014 1994 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1995 default:
1996 ERR("Unknown consumer_data type");
a0377dfe 1997 abort();
3bd1e081
MD
1998 return -ENOSYS;
1999 }
3bd1e081
MD
2000}
2001
2002/*
2003 * Get the produced position
2004 *
2005 * Returns 0 on success, < 0 on error
2006 */
28ab034a 2007int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 2008{
fa29bfbf 2009 switch (the_consumer_data.type) {
3bd1e081 2010 case LTTNG_CONSUMER_KERNEL:
ffe60014 2011 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
2012 case LTTNG_CONSUMER32_UST:
2013 case LTTNG_CONSUMER64_UST:
ffe60014 2014 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2015 default:
2016 ERR("Unknown consumer_data type");
a0377dfe 2017 abort();
3bd1e081
MD
2018 return -ENOSYS;
2019 }
2020}
2021
15055ce5
JD
2022/*
2023 * Get the consumed position (free-running counter position in bytes).
2024 *
2025 * Returns 0 on success, < 0 on error
2026 */
28ab034a 2027int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
15055ce5 2028{
fa29bfbf 2029 switch (the_consumer_data.type) {
15055ce5
JD
2030 case LTTNG_CONSUMER_KERNEL:
2031 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2032 case LTTNG_CONSUMER32_UST:
2033 case LTTNG_CONSUMER64_UST:
2034 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2035 default:
2036 ERR("Unknown consumer_data type");
a0377dfe 2037 abort();
15055ce5
JD
2038 return -ENOSYS;
2039 }
2040}
2041
3bd1e081 2042int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
2043 int sock,
2044 struct pollfd *consumer_sockpoll)
3bd1e081 2045{
fa29bfbf 2046 switch (the_consumer_data.type) {
3bd1e081
MD
2047 case LTTNG_CONSUMER_KERNEL:
2048 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2049 case LTTNG_CONSUMER32_UST:
2050 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2051 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2052 default:
2053 ERR("Unknown consumer_data type");
a0377dfe 2054 abort();
3bd1e081
MD
2055 return -ENOSYS;
2056 }
2057}
2058
cd9adb8b 2059static void lttng_consumer_close_all_metadata()
d88aee68 2060{
fa29bfbf 2061 switch (the_consumer_data.type) {
d88aee68
DG
2062 case LTTNG_CONSUMER_KERNEL:
2063 /*
2064 * The Kernel consumer has a different metadata scheme so we don't
2065 * close anything because the stream will be closed by the session
2066 * daemon.
2067 */
2068 break;
2069 case LTTNG_CONSUMER32_UST:
2070 case LTTNG_CONSUMER64_UST:
2071 /*
2072 * Close all metadata streams. The metadata hash table is passed and
2073 * this call iterates over it by closing all wakeup fd. This is safe
2074 * because at this point we are sure that the metadata producer is
2075 * either dead or blocked.
2076 */
6d574024 2077 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2078 break;
2079 default:
2080 ERR("Unknown consumer_data type");
a0377dfe 2081 abort();
d88aee68
DG
2082 }
2083}
2084
fb3a43a9
DG
2085/*
2086 * Clean up a metadata stream and free its memory.
2087 */
28ab034a 2088void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
fb3a43a9 2089{
cd9adb8b 2090 struct lttng_consumer_channel *channel = nullptr;
a6ef8ee6 2091 bool free_channel = false;
fb3a43a9 2092
a0377dfe 2093 LTTNG_ASSERT(stream);
fb3a43a9
DG
2094 /*
2095 * This call should NEVER receive regular stream. It must always be
2096 * metadata stream and this is crucial for data structure synchronization.
2097 */
a0377dfe 2098 LTTNG_ASSERT(stream->metadata_flag);
fb3a43a9 2099
e316aad5
DG
2100 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2101
fa29bfbf 2102 pthread_mutex_lock(&the_consumer_data.lock);
a6ef8ee6
JG
2103 /*
2104 * Note that this assumes that a stream's channel is never changed and
2105 * that the stream's lock doesn't need to be taken to sample its
2106 * channel.
2107 */
2108 channel = stream->chan;
2109 pthread_mutex_lock(&channel->lock);
3dad2c0f 2110 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2111 if (channel->metadata_cache) {
081424af 2112 /* Only applicable to userspace consumers. */
a6ef8ee6 2113 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2114 }
8994307f 2115
6d574024
DG
2116 /* Remove any reference to that stream. */
2117 consumer_stream_delete(stream, ht);
ca22feea 2118
6d574024 2119 /* Close down everything including the relayd if one. */
d119bd01 2120 consumer_stream_close_output(stream);
6d574024
DG
2121 /* Destroy tracer buffers of the stream. */
2122 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2123
2124 /* Atomically decrement channel refcount since other threads can use it. */
28ab034a
JG
2125 if (!uatomic_sub_return(&channel->refcount, 1) &&
2126 !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2127 /* Go for channel deletion! */
a6ef8ee6 2128 free_channel = true;
fb3a43a9 2129 }
cd9adb8b 2130 stream->chan = nullptr;
fb3a43a9 2131
73811ecc
DG
2132 /*
2133 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2134 * channel lock MUST be acquired before being able to check for a NULL
2135 * pointer value.
73811ecc 2136 */
cd9adb8b 2137 channel->metadata_stream = nullptr;
32670d71 2138 channel->metadata_pushed_wait_queue.wake_all();
73811ecc 2139
a6ef8ee6
JG
2140 if (channel->metadata_cache) {
2141 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2142 }
3dad2c0f 2143 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2144 pthread_mutex_unlock(&channel->lock);
fa29bfbf 2145 pthread_mutex_unlock(&the_consumer_data.lock);
e316aad5 2146
a6ef8ee6
JG
2147 if (free_channel) {
2148 consumer_del_channel(channel);
e316aad5
DG
2149 }
2150
d2956687 2151 lttng_trace_chunk_put(stream->trace_chunk);
cd9adb8b 2152 stream->trace_chunk = nullptr;
6d574024 2153 consumer_stream_free(stream);
fb3a43a9
DG
2154}
2155
2156/*
2157 * Action done with the metadata stream when adding it to the consumer internal
2158 * data structures to handle it.
2159 */
66d583dc 2160void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2161{
5ab66908 2162 struct lttng_ht *ht = metadata_ht;
76082088 2163 struct lttng_ht_iter iter;
d88aee68 2164 struct lttng_ht_node_u64 *node;
fb3a43a9 2165
a0377dfe
FD
2166 LTTNG_ASSERT(stream);
2167 LTTNG_ASSERT(ht);
e316aad5 2168
d88aee68 2169 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5 2170
fa29bfbf 2171 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 2172 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2173 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2174 pthread_mutex_lock(&stream->lock);
e316aad5 2175
e316aad5
DG
2176 /*
2177 * From here, refcounts are updated so be _careful_ when returning an error
2178 * after this point.
2179 */
2180
56047f5a 2181 lttng::urcu::read_lock_guard read_lock;
76082088
DG
2182
2183 /*
2184 * Lookup the stream just to make sure it does not exist in our internal
2185 * state. This should NEVER happen.
2186 */
d88aee68
DG
2187 lttng_ht_lookup(ht, &stream->key, &iter);
2188 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2189 LTTNG_ASSERT(!node);
76082088 2190
e316aad5 2191 /*
ffe60014
DG
2192 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2193 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2194 * causes the count to become 0 also causes a stream to be added. The
2195 * channel deletion will thus be triggered by the following removal of this
2196 * stream.
2197 */
ffe60014 2198 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2199 /* Increment refcount before decrementing nb_init_stream_left */
2200 cmm_smp_wmb();
ffe60014 2201 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2202 }
2203
d88aee68 2204 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2205
28ab034a 2206 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 2207
ca22feea
DG
2208 /*
2209 * Add stream to the stream_list_ht of the consumer data. No need to steal
2210 * the key since the HT does not use it and we allow to add redundant keys
2211 * into this table.
2212 */
28ab034a 2213 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2214
2e818a6a 2215 pthread_mutex_unlock(&stream->lock);
a9838785 2216 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2217 pthread_mutex_unlock(&stream->chan->timer_lock);
fa29bfbf 2218 pthread_mutex_unlock(&the_consumer_data.lock);
fb3a43a9
DG
2219}
2220
8994307f
DG
2221/*
2222 * Delete data stream that are flagged for deletion (endpoint_status).
2223 */
cd9adb8b 2224static void validate_endpoint_status_data_stream()
8994307f
DG
2225{
2226 struct lttng_ht_iter iter;
2227 struct lttng_consumer_stream *stream;
2228
2229 DBG("Consumer delete flagged data stream");
2230
56047f5a
JG
2231 {
2232 lttng::urcu::read_lock_guard read_lock;
2233
2234 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2235 /* Validate delete flag of the stream */
2236 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2237 continue;
2238 }
2239 /* Delete it right now */
2240 consumer_del_stream(stream, data_ht);
8994307f 2241 }
8994307f 2242 }
8994307f
DG
2243}
2244
2245/*
2246 * Delete metadata stream that are flagged for deletion (endpoint_status).
2247 */
28ab034a 2248static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
8994307f
DG
2249{
2250 struct lttng_ht_iter iter;
2251 struct lttng_consumer_stream *stream;
2252
2253 DBG("Consumer delete flagged metadata stream");
2254
a0377dfe 2255 LTTNG_ASSERT(pollset);
8994307f 2256
56047f5a
JG
2257 {
2258 lttng::urcu::read_lock_guard read_lock;
2259 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2260 /* Validate delete flag of the stream */
2261 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2262 continue;
2263 }
2264 /*
2265 * Remove from pollset so the metadata thread can continue without
2266 * blocking on a deleted stream.
2267 */
2268 lttng_poll_del(pollset, stream->wait_fd);
8994307f 2269
56047f5a
JG
2270 /* Delete it right now */
2271 consumer_del_metadata_stream(stream, metadata_ht);
2272 }
8994307f 2273 }
8994307f
DG
2274}
2275
fb3a43a9
DG
2276/*
2277 * Thread polls on metadata file descriptor and write them on disk or on the
2278 * network.
2279 */
7d980def 2280void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2281{
1fc79fb4 2282 int ret, i, pollfd, err = -1;
fb3a43a9 2283 uint32_t revents, nb_fd;
cd9adb8b 2284 struct lttng_consumer_stream *stream = nullptr;
fb3a43a9 2285 struct lttng_ht_iter iter;
d88aee68 2286 struct lttng_ht_node_u64 *node;
fb3a43a9 2287 struct lttng_poll_event events;
97535efa 2288 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
fb3a43a9
DG
2289 ssize_t len;
2290
2291 rcu_register_thread();
2292
1fc79fb4
MD
2293 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2294
2d57de81
MD
2295 if (testpoint(consumerd_thread_metadata)) {
2296 goto error_testpoint;
2297 }
2298
9ce5646a
MD
2299 health_code_update();
2300
fb3a43a9
DG
2301 DBG("Thread metadata poll started");
2302
fb3a43a9
DG
2303 /* Size is set to 1 for the consumer_metadata pipe */
2304 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2305 if (ret < 0) {
2306 ERR("Poll set creation failed");
d8ef542d 2307 goto end_poll;
fb3a43a9
DG
2308 }
2309
28ab034a 2310 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2311 if (ret < 0) {
2312 goto end;
2313 }
2314
2315 /* Main loop */
2316 DBG("Metadata main loop started");
2317
cd9adb8b 2318 while (true) {
28ab034a 2319 restart:
7fa2082e 2320 health_code_update();
9ce5646a 2321 health_poll_entry();
7fa2082e 2322 DBG("Metadata poll wait");
fb3a43a9 2323 ret = lttng_poll_wait(&events, -1);
28ab034a 2324 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2325 health_poll_exit();
40063ead 2326 DBG("Metadata event caught in thread");
fb3a43a9
DG
2327 if (ret < 0) {
2328 if (errno == EINTR) {
40063ead 2329 ERR("Poll EINTR caught");
fb3a43a9
DG
2330 goto restart;
2331 }
d9607cd7 2332 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2333 err = 0; /* All is OK */
d9607cd7
MD
2334 }
2335 goto end;
fb3a43a9
DG
2336 }
2337
0d9c5d77
DG
2338 nb_fd = ret;
2339
e316aad5 2340 /* From here, the event is a metadata wait fd */
fb3a43a9 2341 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2342 health_code_update();
2343
fb3a43a9
DG
2344 revents = LTTNG_POLL_GETEV(&events, i);
2345 pollfd = LTTNG_POLL_GETFD(&events, i);
2346
13886d2d 2347 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2348 if (revents & LPOLLIN) {
13886d2d
DG
2349 ssize_t pipe_len;
2350
2351 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
28ab034a 2352 &stream,
5c7248cd
JG
2353 sizeof(stream)); /* NOLINT sizeof
2354 used on a
2355 pointer. */
2356 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2357 pointer. */
03e43155
MD
2358 if (pipe_len < 0) {
2359 PERROR("read metadata stream");
2360 }
fb3a43a9 2361 /*
28ab034a
JG
2362 * Remove the pipe from the poll set and continue
2363 * the loop since their might be data to consume.
fb3a43a9 2364 */
28ab034a
JG
2365 lttng_poll_del(
2366 &events,
2367 lttng_pipe_get_readfd(
2368 ctx->consumer_metadata_pipe));
03e43155 2369 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2370 continue;
2371 }
2372
8994307f 2373 /* A NULL stream means that the state has changed. */
cd9adb8b 2374 if (stream == nullptr) {
8994307f
DG
2375 /* Check for deleted streams. */
2376 validate_endpoint_status_metadata_stream(&events);
3714380f 2377 goto restart;
8994307f
DG
2378 }
2379
fb3a43a9 2380 DBG("Adding metadata stream %d to poll set",
28ab034a 2381 stream->wait_fd);
fb3a43a9 2382
fb3a43a9 2383 /* Add metadata stream to the global poll events list */
28ab034a
JG
2384 lttng_poll_add(
2385 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2386 } else if (revents & (LPOLLERR | LPOLLHUP)) {
03e43155
MD
2387 DBG("Metadata thread pipe hung up");
2388 /*
2389 * Remove the pipe from the poll set and continue the loop
2390 * since their might be data to consume.
2391 */
28ab034a
JG
2392 lttng_poll_del(
2393 &events,
2394 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
03e43155
MD
2395 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2396 continue;
2397 } else {
28ab034a
JG
2398 ERR("Unexpected poll events %u for sock %d",
2399 revents,
2400 pollfd);
03e43155 2401 goto end;
fb3a43a9
DG
2402 }
2403
e316aad5 2404 /* Handle other stream */
fb3a43a9
DG
2405 continue;
2406 }
2407
56047f5a 2408 lttng::urcu::read_lock_guard read_lock;
d88aee68
DG
2409 {
2410 uint64_t tmp_id = (uint64_t) pollfd;
2411
2412 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2413 }
2414 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2415 LTTNG_ASSERT(node);
fb3a43a9 2416
28ab034a 2417 stream = caa_container_of(node, struct lttng_consumer_stream, node);
fb3a43a9 2418
03e43155
MD
2419 if (revents & (LPOLLIN | LPOLLPRI)) {
2420 /* Get the data out of the metadata file descriptor */
2421 DBG("Metadata available on fd %d", pollfd);
a0377dfe 2422 LTTNG_ASSERT(stream->wait_fd == pollfd);
03e43155
MD
2423
2424 do {
2425 health_code_update();
2426
6f9449c2 2427 len = ctx->on_buffer_ready(stream, ctx, false);
03e43155
MD
2428 /*
2429 * We don't check the return value here since if we get
83f4233d 2430 * a negative len, it means an error occurred thus we
03e43155
MD
2431 * simply remove it from the poll set and free the
2432 * stream.
2433 */
2434 } while (len > 0);
2435
2436 /* It's ok to have an unavailable sub-buffer */
2437 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2438 /* Clean up stream from consumer and free it. */
2439 lttng_poll_del(&events, stream->wait_fd);
2440 consumer_del_metadata_stream(stream, metadata_ht);
2441 }
2442 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2443 DBG("Metadata fd %d is hup|err.", pollfd);
fa29bfbf 2444 if (!stream->hangup_flush_done &&
28ab034a
JG
2445 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2446 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
fb3a43a9
DG
2447 DBG("Attempting to flush and consume the UST buffers");
2448 lttng_ustconsumer_on_stream_hangup(stream);
2449
2450 /* We just flushed the stream now read it. */
4bb94b75 2451 do {
9ce5646a
MD
2452 health_code_update();
2453
6f9449c2 2454 len = ctx->on_buffer_ready(stream, ctx, false);
4bb94b75 2455 /*
28ab034a
JG
2456 * We don't check the return value here since if we
2457 * get a negative len, it means an error occurred
2458 * thus we simply remove it from the poll set and
2459 * free the stream.
4bb94b75
DG
2460 */
2461 } while (len > 0);
fb3a43a9
DG
2462 }
2463
fb3a43a9 2464 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2465 /*
2466 * This call update the channel states, closes file descriptors
2467 * and securely free the stream.
2468 */
2469 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2470 } else {
2471 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 2472 goto end;
fb3a43a9 2473 }
e316aad5 2474 /* Release RCU lock for the stream looked up */
fb3a43a9
DG
2475 }
2476 }
2477
1fc79fb4
MD
2478 /* All is OK */
2479 err = 0;
fb3a43a9
DG
2480end:
2481 DBG("Metadata poll thread exiting");
fb3a43a9 2482
d8ef542d
MD
2483 lttng_poll_clean(&events);
2484end_poll:
2d57de81 2485error_testpoint:
1fc79fb4
MD
2486 if (err) {
2487 health_error();
2488 ERR("Health error occurred in %s", __func__);
2489 }
2490 health_unregister(health_consumerd);
fb3a43a9 2491 rcu_unregister_thread();
cd9adb8b 2492 return nullptr;
fb3a43a9
DG
2493}
2494
3bd1e081 2495/*
e4421fec 2496 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2497 * it to tracefile if necessary.
2498 */
7d980def 2499void *consumer_thread_data_poll(void *data)
3bd1e081 2500{
ff930959 2501 int num_rdy, high_prio, ret, i, err = -1;
cd9adb8b 2502 struct pollfd *pollfd = nullptr;
3bd1e081 2503 /* local view of the streams */
cd9adb8b 2504 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
3bd1e081 2505 /* local view of consumer_data.fds_count */
8bdcc002
JG
2506 int nb_fd = 0;
2507 /* 2 for the consumer_data_pipe and wake up pipe */
2508 const int nb_pipes_fd = 2;
9a2fcf78
JD
2509 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2510 int nb_inactive_fd = 0;
97535efa 2511 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
00e2e675 2512 ssize_t len;
3bd1e081 2513
e7b994a3
DG
2514 rcu_register_thread();
2515
1fc79fb4
MD
2516 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2517
2d57de81
MD
2518 if (testpoint(consumerd_thread_data)) {
2519 goto error_testpoint;
2520 }
2521
9ce5646a
MD
2522 health_code_update();
2523
64803277 2524 local_stream = zmalloc<lttng_consumer_stream *>();
cd9adb8b 2525 if (local_stream == nullptr) {
4df6c8cb
MD
2526 PERROR("local_stream malloc");
2527 goto end;
2528 }
3bd1e081 2529
cd9adb8b 2530 while (true) {
9ce5646a
MD
2531 health_code_update();
2532
3bd1e081 2533 high_prio = 0;
3bd1e081
MD
2534
2535 /*
e4421fec 2536 * the fds set has been updated, we need to update our
3bd1e081
MD
2537 * local array as well
2538 */
fa29bfbf
SM
2539 pthread_mutex_lock(&the_consumer_data.lock);
2540 if (the_consumer_data.need_update) {
0e428499 2541 free(pollfd);
cd9adb8b 2542 pollfd = nullptr;
0e428499
DG
2543
2544 free(local_stream);
cd9adb8b 2545 local_stream = nullptr;
3bd1e081 2546
8bdcc002 2547 /* Allocate for all fds */
28ab034a
JG
2548 pollfd =
2549 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2550 if (pollfd == nullptr) {
7a57cf92 2551 PERROR("pollfd malloc");
fa29bfbf 2552 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2553 goto end;
2554 }
2555
28ab034a
JG
2556 local_stream = calloc<lttng_consumer_stream *>(
2557 the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2558 if (local_stream == nullptr) {
7a57cf92 2559 PERROR("local_stream malloc");
fa29bfbf 2560 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2561 goto end;
2562 }
28ab034a
JG
2563 ret = update_poll_array(
2564 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
3bd1e081
MD
2565 if (ret < 0) {
2566 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2567 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
fa29bfbf 2568 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2569 goto end;
2570 }
2571 nb_fd = ret;
fa29bfbf 2572 the_consumer_data.need_update = 0;
3bd1e081 2573 }
fa29bfbf 2574 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081 2575
4078b776 2576 /* No FDs and consumer_quit, consumer_cleanup the thread */
28ab034a
JG
2577 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2578 err = 0; /* All is OK */
4078b776
MD
2579 goto end;
2580 }
3bd1e081 2581 /* poll on the array of fds */
88f2b785 2582 restart:
261de637 2583 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2584 if (testpoint(consumerd_thread_data_poll)) {
2585 goto end;
2586 }
9ce5646a 2587 health_poll_entry();
261de637 2588 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2589 health_poll_exit();
3bd1e081
MD
2590 DBG("poll num_rdy : %d", num_rdy);
2591 if (num_rdy == -1) {
88f2b785
MD
2592 /*
2593 * Restart interrupted system call.
2594 */
2595 if (errno == EINTR) {
2596 goto restart;
2597 }
7a57cf92 2598 PERROR("Poll error");
f73fabfd 2599 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2600 goto end;
2601 } else if (num_rdy == 0) {
2602 DBG("Polling thread timed out");
2603 goto end;
2604 }
2605
80957876
JG
2606 if (caa_unlikely(data_consumption_paused)) {
2607 DBG("Data consumption paused, sleeping...");
2608 sleep(1);
2609 goto restart;
2610 }
2611
3bd1e081 2612 /*
50f8ae69 2613 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2614 * beginning of the loop to update the array. We want to prioritize
2615 * array update over low-priority reads.
3bd1e081 2616 */
509bb1cf 2617 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2618 ssize_t pipe_readlen;
04fdd819 2619
50f8ae69 2620 DBG("consumer_data_pipe wake up");
5c7248cd
JG
2621 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2622 &new_stream,
2623 sizeof(new_stream)); /* NOLINT sizeof used on
2624 a pointer. */
2625 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2626 */
6cd525e8 2627 PERROR("Consumer data pipe");
23f5f35d
DG
2628 /* Continue so we can at least handle the current stream(s). */
2629 continue;
2630 }
c869f647
DG
2631
2632 /*
2633 * If the stream is NULL, just ignore it. It's also possible that
2634 * the sessiond poll thread changed the consumer_quit state and is
2635 * waking us up to test it.
2636 */
cd9adb8b 2637 if (new_stream == nullptr) {
8994307f 2638 validate_endpoint_status_data_stream();
c869f647
DG
2639 continue;
2640 }
2641
c869f647 2642 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2643 continue;
2644 }
2645
02b3d176
DG
2646 /* Handle wakeup pipe. */
2647 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2648 char dummy;
2649 ssize_t pipe_readlen;
2650
28ab034a
JG
2651 pipe_readlen =
2652 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
02b3d176
DG
2653 if (pipe_readlen < 0) {
2654 PERROR("Consumer data wakeup pipe");
2655 }
2656 /* We've been awakened to handle stream(s). */
2657 ctx->has_wakeup = 0;
2658 }
2659
3bd1e081
MD
2660 /* Take care of high priority channels first. */
2661 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2662 health_code_update();
2663
cd9adb8b 2664 if (local_stream[i] == nullptr) {
9617607b
DG
2665 continue;
2666 }
fb3a43a9 2667 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2668 DBG("Urgent read on fd %d", pollfd[i].fd);
2669 high_prio = 1;
6f9449c2 2670 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
d41f73b7 2671 /* it's ok to have an unavailable sub-buffer */
b64403e3 2672 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2673 /* Clean the stream and free it. */
2674 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2675 local_stream[i] = nullptr;
4078b776 2676 } else if (len > 0) {
28ab034a
JG
2677 local_stream[i]->has_data_left_to_be_read_before_teardown =
2678 1;
d41f73b7 2679 }
3bd1e081
MD
2680 }
2681 }
2682
4078b776
MD
2683 /*
2684 * If we read high prio channel in this loop, try again
2685 * for more high prio data.
2686 */
2687 if (high_prio) {
3bd1e081
MD
2688 continue;
2689 }
2690
2691 /* Take care of low priority channels. */
4078b776 2692 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2693 health_code_update();
2694
cd9adb8b 2695 if (local_stream[i] == nullptr) {
9617607b
DG
2696 continue;
2697 }
28ab034a
JG
2698 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2699 local_stream[i]->has_data) {
4078b776 2700 DBG("Normal read on fd %d", pollfd[i].fd);
6f9449c2 2701 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
4078b776 2702 /* it's ok to have an unavailable sub-buffer */
b64403e3 2703 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2704 /* Clean the stream and free it. */
2705 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2706 local_stream[i] = nullptr;
4078b776 2707 } else if (len > 0) {
28ab034a
JG
2708 local_stream[i]->has_data_left_to_be_read_before_teardown =
2709 1;
4078b776
MD
2710 }
2711 }
2712 }
2713
2714 /* Handle hangup and errors */
2715 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2716 health_code_update();
2717
cd9adb8b 2718 if (local_stream[i] == nullptr) {
9617607b
DG
2719 continue;
2720 }
28ab034a
JG
2721 if (!local_stream[i]->hangup_flush_done &&
2722 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2723 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2724 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
4078b776 2725 DBG("fd %d is hup|err|nval. Attempting flush and read.",
28ab034a 2726 pollfd[i].fd);
4078b776
MD
2727 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2728 /* Attempt read again, for the data we just flushed. */
c715ddc9 2729 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
4078b776
MD
2730 }
2731 /*
c715ddc9
JG
2732 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2733 * performed. This type of flush ensures that a new packet is produced no
2734 * matter the consumed/produced positions are.
2735 *
2736 * This, in turn, causes the next pass to see that data available for the
2737 * stream. When we come back here, we can be assured that all available
2738 * data has been consumed and we can finally destroy the stream.
2739 *
4078b776
MD
2740 * If the poll flag is HUP/ERR/NVAL and we have
2741 * read no data in this pass, we can remove the
2742 * stream from its hash table.
2743 */
2744 if ((pollfd[i].revents & POLLHUP)) {
2745 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
c715ddc9 2746 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2747 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2748 local_stream[i] = nullptr;
4078b776
MD
2749 }
2750 } else if (pollfd[i].revents & POLLERR) {
2751 ERR("Error returned in polling fd %d.", pollfd[i].fd);
c715ddc9 2752 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2753 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2754 local_stream[i] = nullptr;
4078b776
MD
2755 }
2756 } else if (pollfd[i].revents & POLLNVAL) {
2757 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
c715ddc9 2758 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2759 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2760 local_stream[i] = nullptr;
3bd1e081
MD
2761 }
2762 }
cd9adb8b 2763 if (local_stream[i] != nullptr) {
c715ddc9 2764 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
9617607b 2765 }
3bd1e081
MD
2766 }
2767 }
1fc79fb4
MD
2768 /* All is OK */
2769 err = 0;
3bd1e081
MD
2770end:
2771 DBG("polling thread exiting");
0e428499
DG
2772 free(pollfd);
2773 free(local_stream);
fb3a43a9
DG
2774
2775 /*
2776 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2777 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2778 * read side of the pipe. If we close them both, epoll_wait strangely does
2779 * not return and could create a endless wait period if the pipe is the
2780 * only tracked fd in the poll set. The thread will take care of closing
2781 * the read side.
fb3a43a9 2782 */
13886d2d 2783 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2784
2d57de81 2785error_testpoint:
1fc79fb4
MD
2786 if (err) {
2787 health_error();
2788 ERR("Health error occurred in %s", __func__);
2789 }
2790 health_unregister(health_consumerd);
2791
e7b994a3 2792 rcu_unregister_thread();
cd9adb8b 2793 return nullptr;
3bd1e081
MD
2794}
2795
d8ef542d
MD
2796/*
2797 * Close wake-up end of each stream belonging to the channel. This will
2798 * allow the poll() on the stream read-side to detect when the
2799 * write-side (application) finally closes them.
2800 */
28ab034a 2801static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
d8ef542d
MD
2802{
2803 struct lttng_ht *ht;
2804 struct lttng_consumer_stream *stream;
2805 struct lttng_ht_iter iter;
2806
fa29bfbf 2807 ht = the_consumer_data.stream_per_chan_id_ht;
d8ef542d 2808
56047f5a 2809 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2810 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
2811 ht->hash_fct(&channel->key, lttng_ht_seed),
2812 ht->match_fct,
2813 &channel->key,
2814 &iter.iter,
2815 stream,
2816 node_channel_id.node)
2817 {
f2ad556d
MD
2818 /*
2819 * Protect against teardown with mutex.
2820 */
2821 pthread_mutex_lock(&stream->lock);
2822 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2823 goto next;
2824 }
fa29bfbf 2825 switch (the_consumer_data.type) {
d8ef542d
MD
2826 case LTTNG_CONSUMER_KERNEL:
2827 break;
2828 case LTTNG_CONSUMER32_UST:
2829 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2830 if (stream->metadata_flag) {
2831 /* Safe and protected by the stream lock. */
2832 lttng_ustconsumer_close_metadata(stream->chan);
2833 } else {
2834 /*
2835 * Note: a mutex is taken internally within
2836 * liblttng-ust-ctl to protect timer wakeup_fd
2837 * use from concurrent close.
2838 */
2839 lttng_ustconsumer_close_stream_wakeup(stream);
2840 }
d8ef542d
MD
2841 break;
2842 default:
2843 ERR("Unknown consumer_data type");
a0377dfe 2844 abort();
d8ef542d 2845 }
f2ad556d
MD
2846 next:
2847 pthread_mutex_unlock(&stream->lock);
d8ef542d 2848 }
d8ef542d
MD
2849}
2850
2851static void destroy_channel_ht(struct lttng_ht *ht)
2852{
2853 struct lttng_ht_iter iter;
2854 struct lttng_consumer_channel *channel;
2855 int ret;
2856
cd9adb8b 2857 if (ht == nullptr) {
d8ef542d
MD
2858 return;
2859 }
2860
56047f5a
JG
2861 {
2862 lttng::urcu::read_lock_guard read_lock;
2863
2864 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2865 ret = lttng_ht_del(ht, &iter);
2866 LTTNG_ASSERT(ret != 0);
2867 }
d8ef542d 2868 }
d8ef542d
MD
2869
2870 lttng_ht_destroy(ht);
2871}
2872
2873/*
2874 * This thread polls the channel fds to detect when they are being
2875 * closed. It closes all related streams if the channel is detected as
2876 * closed. It is currently only used as a shim layer for UST because the
2877 * consumerd needs to keep the per-stream wakeup end of pipes open for
2878 * periodical flush.
2879 */
2880void *consumer_thread_channel_poll(void *data)
2881{
1fc79fb4 2882 int ret, i, pollfd, err = -1;
d8ef542d 2883 uint32_t revents, nb_fd;
cd9adb8b 2884 struct lttng_consumer_channel *chan = nullptr;
d8ef542d
MD
2885 struct lttng_ht_iter iter;
2886 struct lttng_ht_node_u64 *node;
2887 struct lttng_poll_event events;
97535efa 2888 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
d8ef542d
MD
2889 struct lttng_ht *channel_ht;
2890
2891 rcu_register_thread();
2892
1fc79fb4
MD
2893 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2894
2d57de81
MD
2895 if (testpoint(consumerd_thread_channel)) {
2896 goto error_testpoint;
2897 }
2898
9ce5646a
MD
2899 health_code_update();
2900
d8ef542d
MD
2901 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2902 if (!channel_ht) {
2903 /* ENOMEM at this point. Better to bail out. */
2904 goto end_ht;
2905 }
2906
2907 DBG("Thread channel poll started");
2908
2909 /* Size is set to 1 for the consumer_channel pipe */
2910 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2911 if (ret < 0) {
2912 ERR("Poll set creation failed");
2913 goto end_poll;
2914 }
2915
2916 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2917 if (ret < 0) {
2918 goto end;
2919 }
2920
2921 /* Main loop */
2922 DBG("Channel main loop started");
2923
cd9adb8b 2924 while (true) {
28ab034a 2925 restart:
7fa2082e
MD
2926 health_code_update();
2927 DBG("Channel poll wait");
9ce5646a 2928 health_poll_entry();
d8ef542d 2929 ret = lttng_poll_wait(&events, -1);
28ab034a 2930 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2931 health_poll_exit();
40063ead 2932 DBG("Channel event caught in thread");
d8ef542d
MD
2933 if (ret < 0) {
2934 if (errno == EINTR) {
40063ead 2935 ERR("Poll EINTR caught");
d8ef542d
MD
2936 goto restart;
2937 }
d9607cd7 2938 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2939 err = 0; /* All is OK */
d9607cd7 2940 }
d8ef542d
MD
2941 goto end;
2942 }
2943
2944 nb_fd = ret;
2945
2946 /* From here, the event is a channel wait fd */
2947 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2948 health_code_update();
2949
d8ef542d
MD
2950 revents = LTTNG_POLL_GETEV(&events, i);
2951 pollfd = LTTNG_POLL_GETFD(&events, i);
2952
d8ef542d 2953 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 2954 if (revents & LPOLLIN) {
d8ef542d 2955 enum consumer_channel_action action;
a0cbdd2e 2956 uint64_t key;
d8ef542d 2957
a0cbdd2e 2958 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 2959 if (ret <= 0) {
03e43155
MD
2960 if (ret < 0) {
2961 ERR("Error reading channel pipe");
2962 }
28ab034a
JG
2963 lttng_poll_del(&events,
2964 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
2965 continue;
2966 }
2967
2968 switch (action) {
2969 case CONSUMER_CHANNEL_ADD:
56047f5a 2970 {
28ab034a 2971 DBG("Adding channel %d to poll set", chan->wait_fd);
d8ef542d
MD
2972
2973 lttng_ht_node_init_u64(&chan->wait_fd_node,
28ab034a 2974 chan->wait_fd);
56047f5a 2975 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2976 lttng_ht_add_unique_u64(channel_ht,
28ab034a 2977 &chan->wait_fd_node);
d8ef542d 2978 /* Add channel to the global poll events list */
28ab034a
JG
2979 // FIXME: Empty flag on a pipe pollset, this might
2980 // hang on FreeBSD.
1524f98c 2981 lttng_poll_add(&events, chan->wait_fd, 0);
d8ef542d 2982 break;
56047f5a 2983 }
a0cbdd2e
MD
2984 case CONSUMER_CHANNEL_DEL:
2985 {
b4a650f3 2986 /*
28ab034a
JG
2987 * This command should never be called if the
2988 * channel has streams monitored by either the data
2989 * or metadata thread. The consumer only notify this
2990 * thread with a channel del. command if it receives
2991 * a destroy channel command from the session daemon
2992 * that send it if a command prior to the
2993 * GET_CHANNEL failed.
b4a650f3
DG
2994 */
2995
56047f5a 2996 lttng::urcu::read_lock_guard read_lock;
a0cbdd2e
MD
2997 chan = consumer_find_channel(key);
2998 if (!chan) {
28ab034a
JG
2999 ERR("UST consumer get channel key %" PRIu64
3000 " not found for del channel",
3001 key);
a0cbdd2e
MD
3002 break;
3003 }
3004 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 3005 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e 3006 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3007 LTTNG_ASSERT(ret == 0);
a0cbdd2e 3008
fa29bfbf 3009 switch (the_consumer_data.type) {
f2a444f1
DG
3010 case LTTNG_CONSUMER_KERNEL:
3011 break;
3012 case LTTNG_CONSUMER32_UST:
3013 case LTTNG_CONSUMER64_UST:
212d67a2 3014 health_code_update();
28ab034a
JG
3015 /* Destroy streams that might have been left
3016 * in the stream list. */
212d67a2 3017 clean_channel_stream_list(chan);
f2a444f1
DG
3018 break;
3019 default:
3020 ERR("Unknown consumer_data type");
a0377dfe 3021 abort();
f2a444f1
DG
3022 }
3023
a0cbdd2e 3024 /*
28ab034a
JG
3025 * Release our own refcount. Force channel deletion
3026 * even if streams were not initialized.
a0cbdd2e
MD
3027 */
3028 if (!uatomic_sub_return(&chan->refcount, 1)) {
3029 consumer_del_channel(chan);
3030 }
3031 goto restart;
3032 }
d8ef542d
MD
3033 case CONSUMER_CHANNEL_QUIT:
3034 /*
28ab034a
JG
3035 * Remove the pipe from the poll set and continue
3036 * the loop since their might be data to consume.
d8ef542d 3037 */
28ab034a
JG
3038 lttng_poll_del(&events,
3039 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3040 continue;
3041 default:
3042 ERR("Unknown action");
3043 break;
3044 }
03e43155
MD
3045 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3046 DBG("Channel thread pipe hung up");
3047 /*
3048 * Remove the pipe from the poll set and continue the loop
3049 * since their might be data to consume.
3050 */
3051 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3052 continue;
3053 } else {
28ab034a
JG
3054 ERR("Unexpected poll events %u for sock %d",
3055 revents,
3056 pollfd);
03e43155 3057 goto end;
d8ef542d
MD
3058 }
3059
3060 /* Handle other stream */
3061 continue;
3062 }
3063
56047f5a 3064 lttng::urcu::read_lock_guard read_lock;
d8ef542d
MD
3065 {
3066 uint64_t tmp_id = (uint64_t) pollfd;
3067
3068 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3069 }
3070 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 3071 LTTNG_ASSERT(node);
d8ef542d 3072
28ab034a 3073 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
d8ef542d
MD
3074
3075 /* Check for error event */
3076 if (revents & (LPOLLERR | LPOLLHUP)) {
3077 DBG("Channel fd %d is hup|err.", pollfd);
3078
3079 lttng_poll_del(&events, chan->wait_fd);
3080 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3081 LTTNG_ASSERT(ret == 0);
b4a650f3
DG
3082
3083 /*
3084 * This will close the wait fd for each stream associated to
3085 * this channel AND monitored by the data/metadata thread thus
3086 * will be clean by the right thread.
3087 */
d8ef542d 3088 consumer_close_channel_streams(chan);
f2ad556d
MD
3089
3090 /* Release our own refcount */
28ab034a
JG
3091 if (!uatomic_sub_return(&chan->refcount, 1) &&
3092 !uatomic_read(&chan->nb_init_stream_left)) {
f2ad556d
MD
3093 consumer_del_channel(chan);
3094 }
03e43155
MD
3095 } else {
3096 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 3097 goto end;
d8ef542d
MD
3098 }
3099
3100 /* Release RCU lock for the channel looked up */
d8ef542d
MD
3101 }
3102 }
3103
1fc79fb4
MD
3104 /* All is OK */
3105 err = 0;
d8ef542d
MD
3106end:
3107 lttng_poll_clean(&events);
3108end_poll:
3109 destroy_channel_ht(channel_ht);
3110end_ht:
2d57de81 3111error_testpoint:
d8ef542d 3112 DBG("Channel poll thread exiting");
1fc79fb4
MD
3113 if (err) {
3114 health_error();
3115 ERR("Health error occurred in %s", __func__);
3116 }
3117 health_unregister(health_consumerd);
d8ef542d 3118 rcu_unregister_thread();
cd9adb8b 3119 return nullptr;
d8ef542d
MD
3120}
3121
331744e3 3122static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3123 struct pollfd *sockpoll,
3124 int client_socket)
331744e3
JD
3125{
3126 int ret;
3127
a0377dfe
FD
3128 LTTNG_ASSERT(ctx);
3129 LTTNG_ASSERT(sockpoll);
331744e3 3130
84382d49
MD
3131 ret = lttng_consumer_poll_socket(sockpoll);
3132 if (ret) {
331744e3
JD
3133 goto error;
3134 }
3135 DBG("Metadata connection on client_socket");
3136
3137 /* Blocking call, waiting for transmission */
3138 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3139 if (ctx->consumer_metadata_socket < 0) {
3140 WARN("On accept metadata");
3141 ret = -1;
3142 goto error;
3143 }
3144 ret = 0;
3145
3146error:
3147 return ret;
3148}
3149
3bd1e081
MD
3150/*
3151 * This thread listens on the consumerd socket and receives the file
3152 * descriptors from the session daemon.
3153 */
7d980def 3154void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3155{
1fc79fb4 3156 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3157 /*
3158 * structure to poll for incoming data on communication socket avoids
3159 * making blocking sockets.
3160 */
3161 struct pollfd consumer_sockpoll[2];
97535efa 3162 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3bd1e081 3163
e7b994a3
DG
3164 rcu_register_thread();
3165
1fc79fb4
MD
3166 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3167
2d57de81
MD
3168 if (testpoint(consumerd_thread_sessiond)) {
3169 goto error_testpoint;
3170 }
3171
9ce5646a
MD
3172 health_code_update();
3173
3bd1e081
MD
3174 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3175 unlink(ctx->consumer_command_sock_path);
3176 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3177 if (client_socket < 0) {
3178 ERR("Cannot create command socket");
3179 goto end;
3180 }
3181
3182 ret = lttcomm_listen_unix_sock(client_socket);
3183 if (ret < 0) {
3184 goto end;
3185 }
3186
32258573 3187 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3188 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3189 /* return < 0 on error, but == 0 is not fatal */
3190 if (ret < 0) {
32258573 3191 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3192 goto end;
3193 }
3194
3bd1e081
MD
3195 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3196 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3197 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3198 consumer_sockpoll[1].fd = client_socket;
3199 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3200
84382d49
MD
3201 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3202 if (ret) {
3203 if (ret > 0) {
3204 /* should exit */
3205 err = 0;
3206 }
3bd1e081
MD
3207 goto end;
3208 }
3209 DBG("Connection on client_socket");
3210
3211 /* Blocking call, waiting for transmission */
3212 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3213 if (sock < 0) {
3bd1e081
MD
3214 WARN("On accept");
3215 goto end;
3216 }
3bd1e081 3217
331744e3
JD
3218 /*
3219 * Setup metadata socket which is the second socket connection on the
3220 * command unix socket.
3221 */
3222 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3223 if (ret) {
3224 if (ret > 0) {
3225 /* should exit */
3226 err = 0;
3227 }
331744e3
JD
3228 goto end;
3229 }
3230
d96f09c6
DG
3231 /* This socket is not useful anymore. */
3232 ret = close(client_socket);
3233 if (ret < 0) {
3234 PERROR("close client_socket");
3235 }
3236 client_socket = -1;
3237
3bd1e081
MD
3238 /* update the polling structure to poll on the established socket */
3239 consumer_sockpoll[1].fd = sock;
3240 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3241
cd9adb8b 3242 while (true) {
9ce5646a
MD
3243 health_code_update();
3244
3245 health_poll_entry();
3246 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3247 health_poll_exit();
84382d49
MD
3248 if (ret) {
3249 if (ret > 0) {
3250 /* should exit */
3251 err = 0;
3252 }
3bd1e081
MD
3253 goto end;
3254 }
3255 DBG("Incoming command on sock");
3256 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3257 if (ret <= 0) {
3258 /*
3259 * This could simply be a session daemon quitting. Don't output
3260 * ERR() here.
3261 */
3262 DBG("Communication interrupted on command socket");
41ba6035 3263 err = 0;
3bd1e081
MD
3264 goto end;
3265 }
10211f5c 3266 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3267 DBG("consumer_thread_receive_fds received quit from signal");
28ab034a 3268 err = 0; /* All is OK */
3bd1e081
MD
3269 goto end;
3270 }
a1ed855a 3271 DBG("Received command on sock");
3bd1e081 3272 }
1fc79fb4
MD
3273 /* All is OK */
3274 err = 0;
3275
3bd1e081 3276end:
ffe60014 3277 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3278
d88aee68
DG
3279 /*
3280 * Close metadata streams since the producer is the session daemon which
3281 * just died.
3282 *
3283 * NOTE: for now, this only applies to the UST tracer.
3284 */
6d574024 3285 lttng_consumer_close_all_metadata();
d88aee68 3286
3bd1e081
MD
3287 /*
3288 * when all fds have hung up, the polling thread
3289 * can exit cleanly
3290 */
10211f5c 3291 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3292
04fdd819 3293 /*
c869f647 3294 * Notify the data poll thread to poll back again and test the
8994307f 3295 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3296 */
acdb9057 3297 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3298
cd9adb8b 3299 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3300
5c635c72
MD
3301 notify_health_quit_pipe(health_quit_pipe);
3302
d96f09c6
DG
3303 /* Cleaning up possibly open sockets. */
3304 if (sock >= 0) {
3305 ret = close(sock);
3306 if (ret < 0) {
3307 PERROR("close sock sessiond poll");
3308 }
3309 }
3310 if (client_socket >= 0) {
38476d24 3311 ret = close(client_socket);
d96f09c6
DG
3312 if (ret < 0) {
3313 PERROR("close client_socket sessiond poll");
3314 }
3315 }
3316
2d57de81 3317error_testpoint:
1fc79fb4
MD
3318 if (err) {
3319 health_error();
3320 ERR("Health error occurred in %s", __func__);
3321 }
3322 health_unregister(health_consumerd);
3323
e7b994a3 3324 rcu_unregister_thread();
cd9adb8b 3325 return nullptr;
3bd1e081 3326}
d41f73b7 3327
503fefca 3328static int post_consume(struct lttng_consumer_stream *stream,
28ab034a
JG
3329 const struct stream_subbuffer *subbuffer,
3330 struct lttng_consumer_local_data *ctx)
f96af312 3331{
503fefca 3332 size_t i;
f96af312 3333 int ret = 0;
28ab034a
JG
3334 const size_t count =
3335 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
f96af312 3336
503fefca
JG
3337 for (i = 0; i < count; i++) {
3338 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
28ab034a 3339 &stream->read_subbuffer_ops.post_consume_cbs, i);
503fefca
JG
3340
3341 ret = op(stream, subbuffer, ctx);
3342 if (ret) {
3343 goto end;
f96af312 3344 }
f96af312 3345 }
f96af312
JG
3346end:
3347 return ret;
3348}
3349
4078b776 3350ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
28ab034a
JG
3351 struct lttng_consumer_local_data *ctx,
3352 bool locked_by_caller)
d41f73b7 3353{
12bddd1d 3354 ssize_t ret, written_bytes = 0;
23d56598 3355 int rotation_ret;
6f9449c2 3356 struct stream_subbuffer subbuffer = {};
b6797c8e 3357 enum get_next_subbuffer_status get_next_status;
74251bb8 3358
6f9449c2
JG
3359 if (!locked_by_caller) {
3360 stream->read_subbuffer_ops.lock(stream);
947bd097
JR
3361 } else {
3362 stream->read_subbuffer_ops.assert_locked(stream);
6f9449c2
JG
3363 }
3364
3365 if (stream->read_subbuffer_ops.on_wake_up) {
3366 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3367 if (ret) {
3368 goto end;
3369 }
94d49140 3370 }
74251bb8 3371
23d56598
JG
3372 /*
3373 * If the stream was flagged to be ready for rotation before we extract
3374 * the next packet, rotate it now.
3375 */
3376 if (stream->rotate_ready) {
3377 DBG("Rotate stream before consuming data");
f46376a1 3378 ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3379 if (ret < 0) {
3380 ERR("Stream rotation error before consuming data");
3381 goto end;
3382 }
3383 }
3384
28ab034a 3385 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
b6797c8e
JG
3386 switch (get_next_status) {
3387 case GET_NEXT_SUBBUFFER_STATUS_OK:
3388 break;
3389 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3390 /* Not an error. */
3391 ret = 0;
3392 goto sleep_stream;
3393 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3394 ret = -1;
6f9449c2 3395 goto end;
b6797c8e
JG
3396 default:
3397 abort();
d41f73b7 3398 }
74251bb8 3399
28ab034a 3400 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
6f9449c2
JG
3401 if (ret) {
3402 goto error_put_subbuf;
3403 }
3404
28ab034a 3405 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
514775d9
FD
3406 if (written_bytes <= 0) {
3407 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3408 ret = (int) written_bytes;
3409 goto error_put_subbuf;
6f9449c2
JG
3410 }
3411
3412 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3413 if (ret) {
23d56598
JG
3414 goto end;
3415 }
3416
503fefca
JG
3417 ret = post_consume(stream, &subbuffer, ctx);
3418 if (ret) {
3419 goto end;
6f9449c2
JG
3420 }
3421
23d56598
JG
3422 /*
3423 * After extracting the packet, we check if the stream is now ready to
3424 * be rotated and perform the action immediately.
3425 *
3426 * Don't overwrite `ret` as callers expect the number of bytes
3427 * consumed to be returned on success.
3428 */
3429 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3430 if (rotation_ret == 1) {
f46376a1 3431 rotation_ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3432 if (rotation_ret < 0) {
3433 ret = rotation_ret;
3434 ERR("Stream rotation error after consuming data");
3435 goto end;
3436 }
503fefca 3437
23d56598
JG
3438 } else if (rotation_ret < 0) {
3439 ret = rotation_ret;
3440 ERR("Failed to check if stream was ready to rotate after consuming data");
3441 goto end;
3442 }
3443
82e72193 3444sleep_stream:
6f9449c2
JG
3445 if (stream->read_subbuffer_ops.on_sleep) {
3446 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3447 }
3448
3449 ret = written_bytes;
23d56598 3450end:
6f9449c2
JG
3451 if (!locked_by_caller) {
3452 stream->read_subbuffer_ops.unlock(stream);
94d49140 3453 }
6f9449c2 3454
74251bb8 3455 return ret;
6f9449c2
JG
3456error_put_subbuf:
3457 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3458 goto end;
d41f73b7
MD
3459}
3460
3461int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3462{
fa29bfbf 3463 switch (the_consumer_data.type) {
d41f73b7
MD
3464 case LTTNG_CONSUMER_KERNEL:
3465 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3466 case LTTNG_CONSUMER32_UST:
3467 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3468 return lttng_ustconsumer_on_recv_stream(stream);
3469 default:
3470 ERR("Unknown consumer_data type");
a0377dfe 3471 abort();
d41f73b7
MD
3472 return -ENOSYS;
3473 }
3474}
e4421fec
DG
3475
3476/*
3477 * Allocate and set consumer data hash tables.
3478 */
cd9adb8b 3479int lttng_consumer_init()
e4421fec 3480{
fa29bfbf
SM
3481 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3482 if (!the_consumer_data.channel_ht) {
282dadbc
MD
3483 goto error;
3484 }
3485
28ab034a 3486 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3487 if (!the_consumer_data.channels_by_session_id_ht) {
5c3892a6
JG
3488 goto error;
3489 }
3490
fa29bfbf
SM
3491 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3492 if (!the_consumer_data.relayd_ht) {
282dadbc
MD
3493 goto error;
3494 }
3495
fa29bfbf
SM
3496 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3497 if (!the_consumer_data.stream_list_ht) {
282dadbc
MD
3498 goto error;
3499 }
3500
28ab034a 3501 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3502 if (!the_consumer_data.stream_per_chan_id_ht) {
282dadbc
MD
3503 goto error;
3504 }
3505
3506 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3507 if (!data_ht) {
3508 goto error;
3509 }
3510
3511 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3512 if (!metadata_ht) {
3513 goto error;
3514 }
3515
fa29bfbf
SM
3516 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3517 if (!the_consumer_data.chunk_registry) {
28cc88f3
JG
3518 goto error;
3519 }
3520
282dadbc
MD
3521 return 0;
3522
3523error:
3524 return -1;
e4421fec 3525}
7735ef9e
DG
3526
3527/*
3528 * Process the ADD_RELAYD command receive by a consumer.
3529 *
3530 * This will create a relayd socket pair and add it to the relayd hash table.
3531 * The caller MUST acquire a RCU read side lock before calling it.
3532 */
4222116f 3533void consumer_add_relayd_socket(uint64_t net_seq_idx,
28ab034a
JG
3534 int sock_type,
3535 struct lttng_consumer_local_data *ctx,
3536 int sock,
3537 struct pollfd *consumer_sockpoll,
3538 uint64_t sessiond_id,
3539 uint64_t relayd_session_id,
3540 uint32_t relayd_version_major,
3541 uint32_t relayd_version_minor,
3542 enum lttcomm_sock_proto relayd_socket_protocol)
7735ef9e 3543{
cd2b09ed 3544 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3545 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 3546 struct consumer_relayd_sock_pair *relayd = nullptr;
7735ef9e 3547
a0377dfe 3548 LTTNG_ASSERT(ctx);
4222116f 3549 LTTNG_ASSERT(sock >= 0);
48b7cdc2 3550 ASSERT_RCU_READ_LOCKED();
6151a90f 3551
da009f2c 3552 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3553
3554 /* Get relayd reference if exists. */
3555 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 3556 if (relayd == nullptr) {
a0377dfe 3557 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3558 /* Not found. Allocate one. */
3559 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
cd9adb8b 3560 if (relayd == nullptr) {
618a6a28
MD
3561 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3562 goto error;
0d08d75e 3563 } else {
30319bcb 3564 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3565 relayd_created = 1;
7735ef9e 3566 }
0d08d75e
DG
3567
3568 /*
3569 * This code path MUST continue to the consumer send status message to
3570 * we can notify the session daemon and continue our work without
3571 * killing everything.
3572 */
da009f2c
MD
3573 } else {
3574 /*
3575 * relayd key should never be found for control socket.
3576 */
a0377dfe 3577 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3578 }
3579
3580 /* First send a status message before receiving the fds. */
0c759fc9 3581 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3582 if (ret < 0) {
0d08d75e 3583 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3584 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3585 goto error_nosignal;
7735ef9e
DG
3586 }
3587
3588 /* Poll on consumer socket. */
84382d49
MD
3589 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3590 if (ret) {
3591 /* Needing to exit in the middle of a command: error. */
0d08d75e 3592 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3593 goto error_nosignal;
7735ef9e
DG
3594 }
3595
3596 /* Get relayd socket from session daemon */
3597 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3598 if (ret != sizeof(fd)) {
28ab034a 3599 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3600
3601 /*
3602 * Failing to receive FDs might indicate a major problem such as
3603 * reaching a fd limit during the receive where the kernel returns a
3604 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3605 * don't take any chances and stop everything.
3606 *
3607 * XXX: Feature request #558 will fix that and avoid this possible
3608 * issue when reaching the fd limit.
3609 */
3610 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3611 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3612 goto error;
3613 }
3614
7735ef9e
DG
3615 /* Copy socket information and received FD */
3616 switch (sock_type) {
3617 case LTTNG_STREAM_CONTROL:
3618 /* Copy received lttcomm socket */
4222116f 3619 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3620 &relayd->control_sock.sock, fd, relayd_socket_protocol);
7735ef9e 3621
6151a90f 3622 /* Assign version values. */
4222116f
JR
3623 relayd->control_sock.major = relayd_version_major;
3624 relayd->control_sock.minor = relayd_version_minor;
c5b6f4f0 3625
d3e2ba59 3626 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3627
7735ef9e
DG
3628 break;
3629 case LTTNG_STREAM_DATA:
3630 /* Copy received lttcomm socket */
4222116f 3631 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3632 &relayd->data_sock.sock, fd, relayd_socket_protocol);
6151a90f 3633 /* Assign version values. */
4222116f
JR
3634 relayd->data_sock.major = relayd_version_major;
3635 relayd->data_sock.minor = relayd_version_minor;
7735ef9e
DG
3636 break;
3637 default:
3638 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3639 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3640 goto error;
3641 }
3642
4222116f
JR
3643 if (ret < 0) {
3644 ret_code = LTTCOMM_CONSUMERD_FATAL;
3645 goto error;
3646 }
3647
d88aee68 3648 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
28ab034a
JG
3649 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3650 relayd->net_seq_idx,
3651 fd);
39d9954c
FD
3652 /*
3653 * We gave the ownership of the fd to the relayd structure. Set the
3654 * fd to -1 so we don't call close() on it in the error path below.
3655 */
3656 fd = -1;
7735ef9e 3657
618a6a28
MD
3658 /* We successfully added the socket. Send status back. */
3659 ret = consumer_send_status_msg(sock, ret_code);
3660 if (ret < 0) {
3661 /* Somehow, the session daemon is not responding anymore. */
3662 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3663 goto error_nosignal;
3664 }
3665
7735ef9e
DG
3666 /*
3667 * Add relayd socket pair to consumer data hashtable. If object already
3668 * exists or on error, the function gracefully returns.
3669 */
9276e5c8 3670 relayd->ctx = ctx;
d09e1200 3671 add_relayd(relayd);
7735ef9e
DG
3672
3673 /* All good! */
2527bf85 3674 return;
7735ef9e
DG
3675
3676error:
618a6a28
MD
3677 if (consumer_send_status_msg(sock, ret_code) < 0) {
3678 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3679 }
3680
3681error_nosignal:
4028eeb9
DG
3682 /* Close received socket if valid. */
3683 if (fd >= 0) {
3684 if (close(fd)) {
3685 PERROR("close received socket");
3686 }
3687 }
cd2b09ed
DG
3688
3689 if (relayd_created) {
cd2b09ed
DG
3690 free(relayd);
3691 }
7735ef9e 3692}
ca22feea 3693
f7079f67
DG
3694/*
3695 * Search for a relayd associated to the session id and return the reference.
3696 *
3697 * A rcu read side lock MUST be acquire before calling this function and locked
3698 * until the relayd object is no longer necessary.
3699 */
3700static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3701{
3702 struct lttng_ht_iter iter;
cd9adb8b 3703 struct consumer_relayd_sock_pair *relayd = nullptr;
f7079f67 3704
48b7cdc2
FD
3705 ASSERT_RCU_READ_LOCKED();
3706
f7079f67 3707 /* Iterate over all relayd since they are indexed by net_seq_idx. */
28ab034a 3708 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
18261bd1
DG
3709 /*
3710 * Check by sessiond id which is unique here where the relayd session
3711 * id might not be when having multiple relayd.
3712 */
3713 if (relayd->sessiond_session_id == id) {
f7079f67 3714 /* Found the relayd. There can be only one per id. */
18261bd1 3715 goto found;
f7079f67
DG
3716 }
3717 }
3718
cd9adb8b 3719 return nullptr;
18261bd1
DG
3720
3721found:
f7079f67
DG
3722 return relayd;
3723}
3724
ca22feea
DG
3725/*
3726 * Check if for a given session id there is still data needed to be extract
3727 * from the buffers.
3728 *
6d805429 3729 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3730 */
6d805429 3731int consumer_data_pending(uint64_t id)
ca22feea
DG
3732{
3733 int ret;
3734 struct lttng_ht_iter iter;
3735 struct lttng_ht *ht;
3736 struct lttng_consumer_stream *stream;
cd9adb8b 3737 struct consumer_relayd_sock_pair *relayd = nullptr;
6d805429 3738 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3739
6d805429 3740 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3741
56047f5a 3742 lttng::urcu::read_lock_guard read_lock;
fa29bfbf 3743 pthread_mutex_lock(&the_consumer_data.lock);
ca22feea 3744
fa29bfbf 3745 switch (the_consumer_data.type) {
ca22feea 3746 case LTTNG_CONSUMER_KERNEL:
6d805429 3747 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3748 break;
3749 case LTTNG_CONSUMER32_UST:
3750 case LTTNG_CONSUMER64_UST:
6d805429 3751 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3752 break;
3753 default:
3754 ERR("Unknown consumer data type");
a0377dfe 3755 abort();
ca22feea
DG
3756 }
3757
3758 /* Ease our life a bit */
fa29bfbf 3759 ht = the_consumer_data.stream_list_ht;
ca22feea 3760
c8f59ee5 3761 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3762 ht->hash_fct(&id, lttng_ht_seed),
3763 ht->match_fct,
3764 &id,
3765 &iter.iter,
3766 stream,
3767 node_session_id.node)
3768 {
bb586a6e 3769 pthread_mutex_lock(&stream->lock);
ca22feea 3770
4e9a4686
DG
3771 /*
3772 * A removed node from the hash table indicates that the stream has
3773 * been deleted thus having a guarantee that the buffers are closed
3774 * on the consumer side. However, data can still be transmitted
3775 * over the network so don't skip the relayd check.
3776 */
3777 ret = cds_lfht_is_node_deleted(&stream->node.node);
3778 if (!ret) {
3779 /* Check the stream if there is data in the buffers. */
6d805429
DG
3780 ret = data_pending(stream);
3781 if (ret == 1) {
4e9a4686 3782 pthread_mutex_unlock(&stream->lock);
f7079f67 3783 goto data_pending;
4e9a4686
DG
3784 }
3785 }
3786
d9f0c7c7
JR
3787 pthread_mutex_unlock(&stream->lock);
3788 }
3789
3790 relayd = find_relayd_by_session_id(id);
3791 if (relayd) {
3792 unsigned int is_data_inflight = 0;
3793
3794 /* Send init command for data pending. */
3795 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 3796 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
d9f0c7c7
JR
3797 if (ret < 0) {
3798 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3799 /* Communication error thus the relayd so no data pending. */
3800 goto data_not_pending;
3801 }
3802
3803 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3804 ht->hash_fct(&id, lttng_ht_seed),
3805 ht->match_fct,
3806 &id,
3807 &iter.iter,
3808 stream,
3809 node_session_id.node)
3810 {
c8f59ee5 3811 if (stream->metadata_flag) {
ad7051c0 3812 ret = relayd_quiescent_control(&relayd->control_sock,
28ab034a 3813 stream->relayd_stream_id);
c8f59ee5 3814 } else {
6d805429 3815 ret = relayd_data_pending(&relayd->control_sock,
28ab034a
JG
3816 stream->relayd_stream_id,
3817 stream->next_net_seq_num - 1);
c8f59ee5 3818 }
d9f0c7c7
JR
3819
3820 if (ret == 1) {
3821 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3822 goto data_pending;
3823 } else if (ret < 0) {
28ab034a
JG
3824 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3825 relayd->net_seq_idx);
9276e5c8
JR
3826 lttng_consumer_cleanup_relayd(relayd);
3827 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
9276e5c8
JR
3828 goto data_not_pending;
3829 }
c8f59ee5 3830 }
f7079f67 3831
d9f0c7c7 3832 /* Send end command for data pending. */
28ab034a
JG
3833 ret = relayd_end_data_pending(
3834 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
f7079f67 3835 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3836 if (ret < 0) {
28ab034a
JG
3837 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3838 relayd->net_seq_idx);
9276e5c8 3839 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3840 goto data_not_pending;
3841 }
bdd88757
DG
3842 if (is_data_inflight) {
3843 goto data_pending;
3844 }
f7079f67
DG
3845 }
3846
ca22feea 3847 /*
f7079f67
DG
3848 * Finding _no_ node in the hash table and no inflight data means that the
3849 * stream(s) have been removed thus data is guaranteed to be available for
3850 * analysis from the trace files.
ca22feea
DG
3851 */
3852
f7079f67 3853data_not_pending:
ca22feea 3854 /* Data is available to be read by a viewer. */
fa29bfbf 3855 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3856 return 0;
ca22feea 3857
f7079f67 3858data_pending:
ca22feea 3859 /* Data is still being extracted from buffers. */
fa29bfbf 3860 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3861 return 1;
ca22feea 3862}
f50f23d9
DG
3863
3864/*
3865 * Send a ret code status message to the sessiond daemon.
3866 *
3867 * Return the sendmsg() return value.
3868 */
3869int consumer_send_status_msg(int sock, int ret_code)
3870{
3871 struct lttcomm_consumer_status_msg msg;
3872
53efb85a 3873 memset(&msg, 0, sizeof(msg));
97535efa 3874 msg.ret_code = (lttcomm_return_code) ret_code;
f50f23d9
DG
3875
3876 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3877}
ffe60014
DG
3878
3879/*
3880 * Send a channel status message to the sessiond daemon.
3881 *
3882 * Return the sendmsg() return value.
3883 */
28ab034a 3884int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
ffe60014
DG
3885{
3886 struct lttcomm_consumer_status_channel msg;
3887
a0377dfe 3888 LTTNG_ASSERT(sock >= 0);
ffe60014 3889
53efb85a 3890 memset(&msg, 0, sizeof(msg));
ffe60014 3891 if (!channel) {
0c759fc9 3892 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3893 } else {
0c759fc9 3894 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3895 msg.key = channel->key;
3896 msg.stream_count = channel->streams.count;
3897 }
3898
3899 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3900}
5c786ded 3901
d07ceecd 3902unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
28ab034a
JG
3903 unsigned long produced_pos,
3904 uint64_t nb_packets_per_stream,
3905 uint64_t max_sb_size)
5c786ded 3906{
d07ceecd 3907 unsigned long start_pos;
5c786ded 3908
d07ceecd 3909 if (!nb_packets_per_stream) {
28ab034a 3910 return consumed_pos; /* Grab everything */
d07ceecd 3911 }
1cbd136b 3912 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
d07ceecd
MD
3913 start_pos -= max_sb_size * nb_packets_per_stream;
3914 if ((long) (start_pos - consumed_pos) < 0) {
28ab034a 3915 return consumed_pos; /* Grab everything */
d07ceecd
MD
3916 }
3917 return start_pos;
5c786ded 3918}
a1ae2ea5 3919
c1dcb8bb
JG
3920/* Stream lock must be held by the caller. */
3921static int sample_stream_positions(struct lttng_consumer_stream *stream,
28ab034a
JG
3922 unsigned long *produced,
3923 unsigned long *consumed)
c1dcb8bb
JG
3924{
3925 int ret;
3926
3927 ASSERT_LOCKED(stream->lock);
3928
3929 ret = lttng_consumer_sample_snapshot_positions(stream);
3930 if (ret < 0) {
3931 ERR("Failed to sample snapshot positions");
3932 goto end;
3933 }
3934
3935 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3936 if (ret < 0) {
3937 ERR("Failed to sample produced position");
3938 goto end;
3939 }
3940
3941 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3942 if (ret < 0) {
3943 ERR("Failed to sample consumed position");
3944 goto end;
3945 }
3946
3947end:
3948 return ret;
3949}
3950
b99a8d42
JD
3951/*
3952 * Sample the rotate position for all the streams of a channel. If a stream
3953 * is already at the rotate position (produced == consumed), we flag it as
3954 * ready for rotation. The rotation of ready streams occurs after we have
3955 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3956 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3957 *
3958 * Returns 0 on success, < 0 on error
3959 */
92b7a7f8 3960int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
3961 uint64_t key,
3962 uint64_t relayd_id)
b99a8d42
JD
3963{
3964 int ret;
b99a8d42
JD
3965 struct lttng_consumer_stream *stream;
3966 struct lttng_ht_iter iter;
fa29bfbf 3967 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
c35f9726
JG
3968 struct lttng_dynamic_array stream_rotation_positions;
3969 uint64_t next_chunk_id, stream_count = 0;
3970 enum lttng_trace_chunk_status chunk_status;
3971 const bool is_local_trace = relayd_id == -1ULL;
cd9adb8b 3972 struct consumer_relayd_sock_pair *relayd = nullptr;
c35f9726 3973 bool rotating_to_new_chunk = true;
b32703d6
JG
3974 /* Array of `struct lttng_consumer_stream *` */
3975 struct lttng_dynamic_pointer_array streams_packet_to_open;
3976 size_t stream_idx;
b99a8d42 3977
48b7cdc2
FD
3978 ASSERT_RCU_READ_LOCKED();
3979
b99a8d42
JD
3980 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3981
cd9adb8b
JG
3982 lttng_dynamic_array_init(&stream_rotation_positions,
3983 sizeof(struct relayd_stream_rotation_position),
3984 nullptr);
3985 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
c35f9726 3986
56047f5a 3987 lttng::urcu::read_lock_guard read_lock;
b99a8d42 3988
b99a8d42 3989 pthread_mutex_lock(&channel->lock);
a0377dfe 3990 LTTNG_ASSERT(channel->trace_chunk);
28ab034a 3991 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
c35f9726
JG
3992 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
3993 ret = -1;
3994 goto end_unlock_channel;
3995 }
b99a8d42
JD
3996
3997 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3998 ht->hash_fct(&channel->key, lttng_ht_seed),
3999 ht->match_fct,
4000 &channel->key,
4001 &iter.iter,
4002 stream,
4003 node_channel_id.node)
4004 {
a40a503f 4005 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
4006
4007 health_code_update();
4008
4009 /*
4010 * Lock stream because we are about to change its state.
4011 */
4012 pthread_mutex_lock(&stream->lock);
4013
c35f9726
JG
4014 if (stream->trace_chunk == stream->chan->trace_chunk) {
4015 rotating_to_new_chunk = false;
4016 }
4017
a40a503f 4018 /*
c1dcb8bb 4019 * Do not flush a packet when rotating from a NULL trace
a9dde553 4020 * chunk. The stream has no means to output data, and the prior
c1dcb8bb
JG
4021 * rotation which rotated to NULL performed that side-effect
4022 * already. No new data can be produced when a stream has no
4023 * associated trace chunk (e.g. a stop followed by a rotate).
a40a503f 4024 */
a9dde553 4025 if (stream->trace_chunk) {
c1dcb8bb
JG
4026 bool flush_active;
4027
4028 if (stream->metadata_flag) {
4029 /*
4030 * Don't produce an empty metadata packet,
4031 * simply close the current one.
4032 *
4033 * Metadata is regenerated on every trace chunk
4034 * switch; there is no concern that no data was
4035 * produced.
4036 */
4037 flush_active = true;
4038 } else {
4039 /*
4040 * Only flush an empty packet if the "packet
4041 * open" could not be performed on transition
4042 * to a new trace chunk and no packets were
4043 * consumed within the chunk's lifetime.
4044 */
4045 if (stream->opened_packet_in_current_trace_chunk) {
4046 flush_active = true;
4047 } else {
4048 /*
4049 * Stream could have been full at the
4050 * time of rotation, but then have had
4051 * no activity at all.
4052 *
4053 * It is important to flush a packet
4054 * to prevent 0-length files from being
4055 * produced as most viewers choke on
4056 * them.
4057 *
4058 * Unfortunately viewers will not be
4059 * able to know that tracing was active
4060 * for this stream during this trace
4061 * chunk's lifetime.
4062 */
28ab034a
JG
4063 ret = sample_stream_positions(
4064 stream, &produced_pos, &consumed_pos);
c1dcb8bb
JG
4065 if (ret) {
4066 goto end_unlock_stream;
4067 }
4068
4069 /*
4070 * Don't flush an empty packet if data
4071 * was produced; it will be consumed
4072 * before the rotation completes.
4073 */
4074 flush_active = produced_pos != consumed_pos;
4075 if (!flush_active) {
c1dcb8bb
JG
4076 const char *trace_chunk_name;
4077 uint64_t trace_chunk_id;
4078
4079 chunk_status = lttng_trace_chunk_get_name(
28ab034a
JG
4080 stream->trace_chunk,
4081 &trace_chunk_name,
cd9adb8b 4082 nullptr);
c1dcb8bb
JG
4083 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4084 trace_chunk_name = "none";
4085 }
4086
4087 /*
4088 * Consumer trace chunks are
4089 * never anonymous.
4090 */
4091 chunk_status = lttng_trace_chunk_get_id(
28ab034a 4092 stream->trace_chunk, &trace_chunk_id);
a0377dfe 4093 LTTNG_ASSERT(chunk_status ==
28ab034a 4094 LTTNG_TRACE_CHUNK_STATUS_OK);
c1dcb8bb
JG
4095
4096 DBG("Unable to open packet for stream during trace chunk's lifetime. "
28ab034a
JG
4097 "Flushing an empty packet to prevent an empty file from being created: "
4098 "stream id = %" PRIu64
4099 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4100 stream->key,
4101 trace_chunk_name,
4102 trace_chunk_id);
c1dcb8bb
JG
4103 }
4104 }
4105 }
4106
a9dde553 4107 /*
c1dcb8bb
JG
4108 * Close the current packet before sampling the
4109 * ring buffer positions.
a9dde553 4110 */
c1dcb8bb 4111 ret = consumer_stream_flush_buffer(stream, flush_active);
a9dde553
MD
4112 if (ret < 0) {
4113 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
28ab034a 4114 stream->key);
a9dde553
MD
4115 goto end_unlock_stream;
4116 }
b99a8d42
JD
4117 }
4118
a40a503f
MD
4119 ret = lttng_consumer_take_snapshot(stream);
4120 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4121 ERR("Failed to sample snapshot position during channel rotation");
b99a8d42
JD
4122 goto end_unlock_stream;
4123 }
a40a503f 4124 if (!ret) {
28ab034a 4125 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
a40a503f
MD
4126 if (ret < 0) {
4127 ERR("Failed to sample produced position during channel rotation");
4128 goto end_unlock_stream;
4129 }
b99a8d42 4130
28ab034a 4131 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
a40a503f
MD
4132 if (ret < 0) {
4133 ERR("Failed to sample consumed position during channel rotation");
4134 goto end_unlock_stream;
4135 }
4136 }
4137 /*
4138 * Align produced position on the start-of-packet boundary of the first
4139 * packet going into the next trace chunk.
4140 */
1cbd136b 4141 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
a40a503f 4142 if (consumed_pos == produced_pos) {
f8528c7a 4143 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
28ab034a
JG
4144 stream->key,
4145 produced_pos,
4146 consumed_pos);
b99a8d42 4147 stream->rotate_ready = true;
f8528c7a
MD
4148 } else {
4149 DBG("Different consumed and produced positions "
28ab034a
JG
4150 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4151 stream->key,
4152 produced_pos,
4153 consumed_pos);
b99a8d42 4154 }
633d0182 4155 /*
a40a503f
MD
4156 * The rotation position is based on the packet_seq_num of the
4157 * packet following the last packet that was consumed for this
4158 * stream, incremented by the offset between produced and
4159 * consumed positions. This rotation position is a lower bound
4160 * (inclusive) at which the next trace chunk starts. Since it
4161 * is a lower bound, it is OK if the packet_seq_num does not
4162 * correspond exactly to the same packet identified by the
4163 * consumed_pos, which can happen in overwrite mode.
633d0182 4164 */
a40a503f
MD
4165 if (stream->sequence_number_unavailable) {
4166 /*
4167 * Rotation should never be performed on a session which
4168 * interacts with a pre-2.8 lttng-modules, which does
4169 * not implement packet sequence number.
4170 */
4171 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
28ab034a 4172 stream->key);
a40a503f 4173 ret = -1;
b99a8d42
JD
4174 goto end_unlock_stream;
4175 }
a40a503f 4176 stream->rotate_position = stream->last_sequence_number + 1 +
28ab034a 4177 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a 4178 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
28ab034a
JG
4179 stream->key,
4180 stream->rotate_position);
b99a8d42 4181
c35f9726 4182 if (!is_local_trace) {
633d0182
JG
4183 /*
4184 * The relay daemon control protocol expects a rotation
4185 * position as "the sequence number of the first packet
a40a503f 4186 * _after_ the current trace chunk".
633d0182 4187 */
c35f9726
JG
4188 const struct relayd_stream_rotation_position position = {
4189 .stream_id = stream->relayd_stream_id,
a40a503f 4190 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4191 };
4192
28ab034a
JG
4193 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4194 &position);
c35f9726
JG
4195 if (ret) {
4196 ERR("Failed to allocate stream rotation position");
4197 goto end_unlock_stream;
4198 }
4199 stream_count++;
4200 }
f96af312
JG
4201
4202 stream->opened_packet_in_current_trace_chunk = false;
4203
4204 if (rotating_to_new_chunk && !stream->metadata_flag) {
4205 /*
4206 * Attempt to flush an empty packet as close to the
4207 * rotation point as possible. In the event where a
4208 * stream remains inactive after the rotation point,
4209 * this ensures that the new trace chunk has a
4210 * beginning timestamp set at the begining of the
4211 * trace chunk instead of only creating an empty
4212 * packet when the trace chunk is stopped.
4213 *
4214 * This indicates to the viewers that the stream
4215 * was being recorded, but more importantly it
4216 * allows viewers to determine a useable trace
4217 * intersection.
4218 *
4219 * This presents a problem in the case where the
4220 * ring-buffer is completely full.
4221 *
4222 * Consider the following scenario:
4223 * - The consumption of data is slow (slow network,
4224 * for instance),
4225 * - The ring buffer is full,
4226 * - A rotation is initiated,
4227 * - The flush below does nothing (no space left to
4228 * open a new packet),
4229 * - The other streams rotate very soon, and new
4230 * data is produced in the new chunk,
4231 * - This stream completes its rotation long after the
4232 * rotation was initiated
4233 * - The session is stopped before any event can be
4234 * produced in this stream's buffers.
4235 *
4236 * The resulting trace chunk will have a single packet
4237 * temporaly at the end of the trace chunk for this
4238 * stream making the stream intersection more narrow
4239 * than it should be.
4240 *
4241 * To work-around this, an empty flush is performed
4242 * after the first consumption of a packet during a
4243 * rotation if open_packet fails. The idea is that
4244 * consuming a packet frees enough space to switch
4245 * packets in this scenario and allows the tracer to
4246 * "stamp" the beginning of the new trace chunk at the
4247 * earliest possible point.
b32703d6
JG
4248 *
4249 * The packet open is performed after the channel
4250 * rotation to ensure that no attempt to open a packet
4251 * is performed in a stream that has no active trace
4252 * chunk.
f96af312 4253 */
28ab034a
JG
4254 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4255 stream);
b32703d6
JG
4256 if (ret) {
4257 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
f96af312
JG
4258 ret = -1;
4259 goto end_unlock_stream;
f96af312
JG
4260 }
4261 }
4262
b99a8d42
JD
4263 pthread_mutex_unlock(&stream->lock);
4264 }
cd9adb8b 4265 stream = nullptr;
b99a8d42 4266
b32703d6
JG
4267 if (!is_local_trace) {
4268 relayd = consumer_find_relayd(relayd_id);
4269 if (!relayd) {
4270 ERR("Failed to find relayd %" PRIu64, relayd_id);
4271 ret = -1;
4272 goto end_unlock_channel;
4273 }
c35f9726 4274
b32703d6 4275 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
4276 ret = relayd_rotate_streams(&relayd->control_sock,
4277 stream_count,
cd9adb8b 4278 rotating_to_new_chunk ? &next_chunk_id : nullptr,
28ab034a
JG
4279 (const struct relayd_stream_rotation_position *)
4280 stream_rotation_positions.buffer.data);
b32703d6
JG
4281 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4282 if (ret < 0) {
4283 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
28ab034a 4284 relayd->net_seq_idx);
b32703d6
JG
4285 lttng_consumer_cleanup_relayd(relayd);
4286 goto end_unlock_channel;
4287 }
c35f9726
JG
4288 }
4289
b32703d6 4290 for (stream_idx = 0;
28ab034a
JG
4291 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4292 stream_idx++) {
b32703d6
JG
4293 enum consumer_stream_open_packet_status status;
4294
97535efa 4295 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
28ab034a 4296 &streams_packet_to_open, stream_idx);
b32703d6
JG
4297
4298 pthread_mutex_lock(&stream->lock);
4299 status = consumer_stream_open_packet(stream);
4300 pthread_mutex_unlock(&stream->lock);
4301 switch (status) {
4302 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4303 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4304 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4305 stream->key,
4306 stream->chan->name,
4307 stream->chan->session_id);
b32703d6
JG
4308 break;
4309 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4310 /*
4311 * Can't open a packet as there is no space left
4312 * in the buffer. A new packet will be opened
4313 * once one has been consumed.
4314 */
4315 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4316 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4317 stream->key,
4318 stream->chan->name,
4319 stream->chan->session_id);
b32703d6
JG
4320 break;
4321 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4322 /* Logged by callee. */
4323 ret = -1;
7a86c13d 4324 goto end_unlock_channel;
b32703d6
JG
4325 default:
4326 abort();
4327 }
c35f9726
JG
4328 }
4329
b32703d6 4330 pthread_mutex_unlock(&channel->lock);
b99a8d42
JD
4331 ret = 0;
4332 goto end;
4333
4334end_unlock_stream:
4335 pthread_mutex_unlock(&stream->lock);
c35f9726 4336end_unlock_channel:
b99a8d42
JD
4337 pthread_mutex_unlock(&channel->lock);
4338end:
c35f9726 4339 lttng_dynamic_array_reset(&stream_rotation_positions);
b32703d6 4340 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
b99a8d42
JD
4341 return ret;
4342}
4343
28ab034a 4344static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4345{
4346 int ret = 0;
4347 unsigned long consumed_pos_before, consumed_pos_after;
4348
4349 ret = lttng_consumer_sample_snapshot_positions(stream);
4350 if (ret < 0) {
4351 ERR("Taking snapshot positions");
4352 goto end;
4353 }
4354
4355 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4356 if (ret < 0) {
4357 ERR("Consumed snapshot position");
4358 goto end;
4359 }
4360
fa29bfbf 4361 switch (the_consumer_data.type) {
5f3aff8b
MD
4362 case LTTNG_CONSUMER_KERNEL:
4363 ret = kernctl_buffer_clear(stream->wait_fd);
4364 if (ret < 0) {
96393977 4365 ERR("Failed to clear kernel stream (ret = %d)", ret);
5f3aff8b
MD
4366 goto end;
4367 }
4368 break;
4369 case LTTNG_CONSUMER32_UST:
4370 case LTTNG_CONSUMER64_UST:
881fc67f
MD
4371 ret = lttng_ustconsumer_clear_buffer(stream);
4372 if (ret < 0) {
4373 ERR("Failed to clear ust stream (ret = %d)", ret);
4374 goto end;
4375 }
5f3aff8b
MD
4376 break;
4377 default:
4378 ERR("Unknown consumer_data type");
4379 abort();
4380 }
4381
4382 ret = lttng_consumer_sample_snapshot_positions(stream);
4383 if (ret < 0) {
4384 ERR("Taking snapshot positions");
4385 goto end;
4386 }
4387 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4388 if (ret < 0) {
4389 ERR("Consumed snapshot position");
4390 goto end;
4391 }
4392 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4393end:
4394 return ret;
4395}
4396
28ab034a 4397static int consumer_clear_stream(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4398{
4399 int ret;
4400
cd9adb8b 4401 ret = consumer_stream_flush_buffer(stream, true);
5f3aff8b 4402 if (ret < 0) {
28ab034a 4403 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4404 ret = LTTCOMM_CONSUMERD_FATAL;
4405 goto error;
4406 }
4407
4408 ret = consumer_clear_buffer(stream);
4409 if (ret < 0) {
28ab034a 4410 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4411 ret = LTTCOMM_CONSUMERD_FATAL;
4412 goto error;
4413 }
4414
4415 ret = LTTCOMM_CONSUMERD_SUCCESS;
4416error:
4417 return ret;
4418}
4419
28ab034a 4420static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
4421{
4422 int ret;
4423 struct lttng_consumer_stream *stream;
4424
56047f5a 4425 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 4426 pthread_mutex_lock(&channel->lock);
28ab034a 4427 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5f3aff8b
MD
4428 health_code_update();
4429 pthread_mutex_lock(&stream->lock);
4430 ret = consumer_clear_stream(stream);
4431 if (ret) {
4432 goto error_unlock;
4433 }
4434 pthread_mutex_unlock(&stream->lock);
4435 }
4436 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4437 return 0;
4438
4439error_unlock:
4440 pthread_mutex_unlock(&stream->lock);
4441 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4442 return ret;
4443}
4444
02d02e31
JD
4445/*
4446 * Check if a stream is ready to be rotated after extracting it.
4447 *
4448 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4449 * error. Stream lock must be held.
4450 */
4451int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4452{
28ab034a
JG
4453 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4454 " last_sequence_number %" PRIu64,
4455 stream->key,
4456 stream->rotate_ready,
4457 stream->rotate_position,
4458 stream->last_sequence_number);
02d02e31 4459 if (stream->rotate_ready) {
a40a503f 4460 return 1;
02d02e31
JD
4461 }
4462
4463 /*
a40a503f
MD
4464 * If packet seq num is unavailable, it means we are interacting
4465 * with a pre-2.8 lttng-modules which does not implement the
4466 * sequence number. Rotation should never be used by sessiond in this
4467 * scenario.
02d02e31 4468 */
a40a503f
MD
4469 if (stream->sequence_number_unavailable) {
4470 ERR("Internal error: rotation used on stream %" PRIu64
28ab034a
JG
4471 " with unavailable sequence number",
4472 stream->key);
a40a503f 4473 return -1;
02d02e31
JD
4474 }
4475
28ab034a 4476 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
a40a503f 4477 return 0;
02d02e31
JD
4478 }
4479
a40a503f
MD
4480 /*
4481 * Rotate position not reached yet. The stream rotate position is
4482 * the position of the next packet belonging to the next trace chunk,
4483 * but consumerd considers rotation ready when reaching the last
4484 * packet of the current chunk, hence the "rotate_position - 1".
4485 */
f8528c7a 4486
28ab034a
JG
4487 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4488 " rotate_position %" PRIu64,
4489 stream->key,
4490 stream->last_sequence_number,
4491 stream->rotate_position);
a40a503f
MD
4492 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4493 return 1;
02d02e31 4494 }
02d02e31 4495
a40a503f 4496 return 0;
02d02e31
JD
4497}
4498
d73bf3d7
JD
4499/*
4500 * Reset the state for a stream after a rotation occurred.
4501 */
4502void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4503{
28ab034a 4504 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
a40a503f 4505 stream->rotate_position = -1ULL;
d73bf3d7
JD
4506 stream->rotate_ready = false;
4507}
4508
4509/*
4510 * Perform the rotation a local stream file.
4511 */
28ab034a 4512static int rotate_local_stream(struct lttng_consumer_stream *stream)
d73bf3d7 4513{
d2956687 4514 int ret = 0;
d73bf3d7 4515
d2956687 4516 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
28ab034a
JG
4517 stream->key,
4518 stream->chan->key);
d73bf3d7 4519 stream->tracefile_size_current = 0;
d2956687 4520 stream->tracefile_count_current = 0;
d73bf3d7 4521
d2956687
JG
4522 if (stream->out_fd >= 0) {
4523 ret = close(stream->out_fd);
4524 if (ret) {
4525 PERROR("Failed to close stream out_fd of channel \"%s\"",
28ab034a 4526 stream->chan->name);
d2956687
JG
4527 }
4528 stream->out_fd = -1;
4529 }
d73bf3d7 4530
d2956687 4531 if (stream->index_file) {
d73bf3d7 4532 lttng_index_file_put(stream->index_file);
cd9adb8b 4533 stream->index_file = nullptr;
d73bf3d7
JD
4534 }
4535
d2956687
JG
4536 if (!stream->trace_chunk) {
4537 goto end;
4538 }
d73bf3d7 4539
d2956687 4540 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4541end:
4542 return ret;
d73bf3d7
JD
4543}
4544
d73bf3d7
JD
4545/*
4546 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4547 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4548 *
4549 * Return 0 on success, a negative number of error.
4550 */
f46376a1 4551int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
d73bf3d7
JD
4552{
4553 int ret;
4554
4555 DBG("Consumer rotate stream %" PRIu64, stream->key);
4556
d2956687
JG
4557 /*
4558 * Update the stream's 'current' chunk to the session's (channel)
4559 * now-current chunk.
4560 */
4561 lttng_trace_chunk_put(stream->trace_chunk);
4562 if (stream->chan->trace_chunk == stream->trace_chunk) {
4563 /*
4564 * A channel can be rotated and not have a "next" chunk
4565 * to transition to. In that case, the channel's "current chunk"
4566 * has not been closed yet, but it has not been updated to
4567 * a "next" trace chunk either. Hence, the stream, like its
4568 * parent channel, becomes part of no chunk and can't output
4569 * anything until a new trace chunk is created.
4570 */
cd9adb8b 4571 stream->trace_chunk = nullptr;
28ab034a 4572 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
d2956687
JG
4573 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4574 ret = -1;
4575 goto error;
4576 } else {
4577 /*
4578 * Update the stream's trace chunk to its parent channel's
4579 * current trace chunk.
4580 */
4581 stream->trace_chunk = stream->chan->trace_chunk;
4582 }
4583
c35f9726 4584 if (stream->net_seq_idx == (uint64_t) -1ULL) {
f46376a1 4585 ret = rotate_local_stream(stream);
c35f9726
JG
4586 if (ret < 0) {
4587 ERR("Failed to rotate stream, ret = %i", ret);
4588 goto error;
4589 }
d73bf3d7
JD
4590 }
4591
d2956687
JG
4592 if (stream->metadata_flag && stream->trace_chunk) {
4593 /*
4594 * If the stream has transitioned to a new trace
4595 * chunk, the metadata should be re-dumped to the
4596 * newest chunk.
4597 *
4598 * However, it is possible for a stream to transition to
4599 * a "no-chunk" state. This can happen if a rotation
4600 * occurs on an inactive session. In such cases, the metadata
4601 * regeneration will happen when the next trace chunk is
4602 * created.
4603 */
4604 ret = consumer_metadata_stream_dump(stream);
4605 if (ret) {
4606 goto error;
d73bf3d7
JD
4607 }
4608 }
4609 lttng_consumer_reset_stream_rotate_state(stream);
4610
4611 ret = 0;
4612
4613error:
4614 return ret;
4615}
4616
b99a8d42
JD
4617/*
4618 * Rotate all the ready streams now.
4619 *
4620 * This is especially important for low throughput streams that have already
4621 * been consumed, we cannot wait for their next packet to perform the
4622 * rotation.
92b7a7f8
MD
4623 * Need to be called with RCU read-side lock held to ensure existence of
4624 * channel.
b99a8d42
JD
4625 *
4626 * Returns 0 on success, < 0 on error
4627 */
28ab034a 4628int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
b99a8d42
JD
4629{
4630 int ret;
b99a8d42
JD
4631 struct lttng_consumer_stream *stream;
4632 struct lttng_ht_iter iter;
fa29bfbf 4633 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
b99a8d42 4634
48b7cdc2
FD
4635 ASSERT_RCU_READ_LOCKED();
4636
56047f5a 4637 lttng::urcu::read_lock_guard read_lock;
b99a8d42
JD
4638
4639 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4640
b99a8d42 4641 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4642 ht->hash_fct(&channel->key, lttng_ht_seed),
4643 ht->match_fct,
4644 &channel->key,
4645 &iter.iter,
4646 stream,
4647 node_channel_id.node)
4648 {
b99a8d42
JD
4649 health_code_update();
4650
d2956687 4651 pthread_mutex_lock(&stream->chan->lock);
b99a8d42
JD
4652 pthread_mutex_lock(&stream->lock);
4653
4654 if (!stream->rotate_ready) {
4655 pthread_mutex_unlock(&stream->lock);
d2956687 4656 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4657 continue;
4658 }
4659 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4660
f46376a1 4661 ret = lttng_consumer_rotate_stream(stream);
b99a8d42 4662 pthread_mutex_unlock(&stream->lock);
d2956687 4663 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4664 if (ret) {
4665 goto end;
4666 }
4667 }
4668
4669 ret = 0;
4670
4671end:
b99a8d42
JD
4672 return ret;
4673}
4674
28ab034a
JG
4675enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4676 const lttng_uuid& sessiond_uuid)
00fb02ac 4677{
d2956687 4678 enum lttcomm_return_code ret;
c70636a7 4679 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4680
d2956687
JG
4681 if (ctx->sessiond_uuid.is_set) {
4682 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4683 goto end;
4684 }
4685
d2956687 4686 ctx->sessiond_uuid.is_set = true;
328c2fe7 4687 ctx->sessiond_uuid.value = sessiond_uuid;
d2956687
JG
4688 ret = LTTCOMM_CONSUMERD_SUCCESS;
4689 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4690 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4691end:
4692 return ret;
4693}
4694
28ab034a
JG
4695enum lttcomm_return_code
4696lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4697 uint64_t session_id,
4698 uint64_t chunk_id,
4699 time_t chunk_creation_timestamp,
4700 const char *chunk_override_name,
4701 const struct lttng_credentials *credentials,
4702 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4703{
4704 int ret;
d2956687 4705 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 4706 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
d2956687
JG
4707 enum lttng_trace_chunk_status chunk_status;
4708 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4709 char creation_timestamp_buffer[ISO8601_STR_LEN];
4710 const char *relayd_id_str = "(none)";
4711 const char *creation_timestamp_str;
4712 struct lttng_ht_iter iter;
4713 struct lttng_consumer_channel *channel;
92816cc3 4714
d2956687
JG
4715 if (relayd_id) {
4716 /* Only used for logging purposes. */
28ab034a 4717 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4718 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4719 relayd_id_str = relayd_id_buffer;
4720 } else {
4721 relayd_id_str = "(formatting error)";
4722 }
d01ef216 4723 }
d2956687 4724
d01ef216 4725 /* Local protocol error. */
a0377dfe 4726 LTTNG_ASSERT(chunk_creation_timestamp);
d2956687 4727 ret = time_to_iso8601_str(chunk_creation_timestamp,
28ab034a
JG
4728 creation_timestamp_buffer,
4729 sizeof(creation_timestamp_buffer));
4730 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
4731
4732 DBG("Consumer create trace chunk command: relay_id = %s"
28ab034a
JG
4733 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4734 ", chunk_creation_timestamp = %s",
4735 relayd_id_str,
4736 session_id,
4737 chunk_id,
4738 chunk_override_name ?: "(none)",
4739 creation_timestamp_str);
92816cc3
JG
4740
4741 /*
d2956687
JG
4742 * The trace chunk registry, as used by the consumer daemon, implicitly
4743 * owns the trace chunks. This is only needed in the consumer since
4744 * the consumer has no notion of a session beyond session IDs being
4745 * used to identify other objects.
4746 *
4747 * The lttng_trace_chunk_registry_publish() call below provides a
4748 * reference which is not released; it implicitly becomes the session
4749 * daemon's reference to the chunk in the consumer daemon.
4750 *
4751 * The lifetime of trace chunks in the consumer daemon is managed by
4752 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4753 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4754 */
cd9adb8b 4755 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
d2956687
JG
4756 if (!created_chunk) {
4757 ERR("Failed to create trace chunk");
4758 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4759 goto error;
d2956687 4760 }
92816cc3 4761
d2956687 4762 if (chunk_override_name) {
28ab034a 4763 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
d2956687
JG
4764 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4765 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4766 goto error;
92816cc3
JG
4767 }
4768 }
4769
d2956687 4770 if (chunk_directory_handle) {
28ab034a 4771 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
d2956687
JG
4772 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4773 ERR("Failed to set trace chunk credentials");
4774 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4775 goto error;
d2956687
JG
4776 }
4777 /*
4778 * The consumer daemon has no ownership of the chunk output
4779 * directory.
4780 */
28ab034a 4781 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
cd9adb8b 4782 chunk_directory_handle = nullptr;
d2956687
JG
4783 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4784 ERR("Failed to set trace chunk's directory handle");
4785 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4786 goto error;
92816cc3
JG
4787 }
4788 }
4789
d2956687 4790 published_chunk = lttng_trace_chunk_registry_publish_chunk(
28ab034a 4791 the_consumer_data.chunk_registry, session_id, created_chunk);
d2956687 4792 lttng_trace_chunk_put(created_chunk);
cd9adb8b 4793 created_chunk = nullptr;
d2956687
JG
4794 if (!published_chunk) {
4795 ERR("Failed to publish trace chunk");
4796 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4797 goto error;
d88744a4
JD
4798 }
4799
28ab034a 4800 {
56047f5a
JG
4801 lttng::urcu::read_lock_guard read_lock;
4802 cds_lfht_for_each_entry_duplicate(
4803 the_consumer_data.channels_by_session_id_ht->ht,
4804 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4805 lttng_ht_seed),
4806 the_consumer_data.channels_by_session_id_ht->match_fct,
4807 &session_id,
4808 &iter.iter,
4809 channel,
4810 channels_by_session_id_ht_node.node)
4811 {
4812 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4813 if (ret) {
4814 /*
4815 * Roll-back the creation of this chunk.
4816 *
4817 * This is important since the session daemon will
4818 * assume that the creation of this chunk failed and
4819 * will never ask for it to be closed, resulting
4820 * in a leak and an inconsistent state for some
4821 * channels.
4822 */
4823 enum lttcomm_return_code close_ret;
4824 char path[LTTNG_PATH_MAX];
4825
4826 DBG("Failed to set new trace chunk on existing channels, rolling back");
4827 close_ret =
4828 lttng_consumer_close_trace_chunk(relayd_id,
4829 session_id,
4830 chunk_id,
4831 chunk_creation_timestamp,
4832 nullptr,
4833 path);
4834 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4835 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4836 ", chunk_id = %" PRIu64,
4837 session_id,
4838 chunk_id);
4839 }
d2956687 4840
56047f5a
JG
4841 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4842 break;
d2956687 4843 }
d2956687 4844 }
a1ae2ea5
JD
4845 }
4846
e5add6d0
JG
4847 if (relayd_id) {
4848 struct consumer_relayd_sock_pair *relayd;
4849
4850 relayd = consumer_find_relayd(*relayd_id);
4851 if (relayd) {
4852 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4853 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
e5add6d0
JG
4854 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4855 } else {
4856 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4857 }
4858
4859 if (!relayd || ret) {
4860 enum lttcomm_return_code close_ret;
ecd1a12f 4861 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4862
4863 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
28ab034a
JG
4864 session_id,
4865 chunk_id,
4866 chunk_creation_timestamp,
cd9adb8b 4867 nullptr,
28ab034a 4868 path);
e5add6d0 4869 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
28ab034a
JG
4870 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4871 ", chunk_id = %" PRIu64,
4872 session_id,
4873 chunk_id);
e5add6d0
JG
4874 }
4875
4876 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4877 goto error_unlock;
e5add6d0
JG
4878 }
4879 }
7ea24db3 4880error_unlock:
7ea24db3 4881error:
d2956687
JG
4882 /* Release the reference returned by the "publish" operation. */
4883 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4884 lttng_trace_chunk_put(created_chunk);
d2956687 4885 return ret_code;
a1ae2ea5
JD
4886}
4887
28ab034a
JG
4888enum lttcomm_return_code
4889lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4890 uint64_t session_id,
4891 uint64_t chunk_id,
4892 time_t chunk_close_timestamp,
4893 const enum lttng_trace_chunk_command_type *close_command,
4894 char *path)
a1ae2ea5 4895{
d2956687
JG
4896 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4897 struct lttng_trace_chunk *chunk;
4898 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4899 const char *relayd_id_str = "(none)";
bbc4768c 4900 const char *close_command_name = "none";
d2956687
JG
4901 struct lttng_ht_iter iter;
4902 struct lttng_consumer_channel *channel;
4903 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4904
d2956687
JG
4905 if (relayd_id) {
4906 int ret;
4907
4908 /* Only used for logging purposes. */
28ab034a 4909 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4910 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4911 relayd_id_str = relayd_id_buffer;
4912 } else {
4913 relayd_id_str = "(formatting error)";
4914 }
bbc4768c
JG
4915 }
4916 if (close_command) {
28ab034a 4917 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
bbc4768c 4918 }
d2956687
JG
4919
4920 DBG("Consumer close trace chunk command: relayd_id = %s"
28ab034a
JG
4921 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4922 relayd_id_str,
4923 session_id,
4924 chunk_id,
4925 close_command_name);
bbc4768c 4926
d2956687 4927 chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 4928 the_consumer_data.chunk_registry, session_id, chunk_id);
bbc4768c 4929 if (!chunk) {
28ab034a
JG
4930 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4931 session_id,
4932 chunk_id);
d2956687 4933 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4934 goto end;
4935 }
4936
28ab034a 4937 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
d2956687
JG
4938 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4939 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4940 goto end;
45f1d9a1 4941 }
bbc4768c
JG
4942
4943 if (close_command) {
28ab034a 4944 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
bbc4768c
JG
4945 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4946 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4947 goto end;
4948 }
4949 }
a1ae2ea5 4950
d2956687
JG
4951 /*
4952 * chunk is now invalid to access as we no longer hold a reference to
4953 * it; it is only kept around to compare it (by address) to the
4954 * current chunk found in the session's channels.
4955 */
56047f5a
JG
4956 {
4957 lttng::urcu::read_lock_guard read_lock;
4958 cds_lfht_for_each_entry (
4959 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4960 int ret;
a1ae2ea5 4961
d2956687 4962 /*
56047f5a
JG
4963 * Only change the channel's chunk to NULL if it still
4964 * references the chunk being closed. The channel may
4965 * reference a newer channel in the case of a session
4966 * rotation. When a session rotation occurs, the "next"
4967 * chunk is created before the "current" chunk is closed.
d2956687 4968 */
56047f5a
JG
4969 if (channel->trace_chunk != chunk) {
4970 continue;
4971 }
4972 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4973 if (ret) {
4974 /*
4975 * Attempt to close the chunk on as many channels as
4976 * possible.
4977 */
4978 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4979 }
d2956687 4980 }
a1ae2ea5 4981 }
bbc4768c
JG
4982 if (relayd_id) {
4983 int ret;
4984 struct consumer_relayd_sock_pair *relayd;
4985
4986 relayd = consumer_find_relayd(*relayd_id);
4987 if (relayd) {
4988 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4989 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
bbc4768c
JG
4990 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4991 } else {
28ab034a 4992 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
bbc4768c
JG
4993 }
4994
4995 if (!relayd || ret) {
4996 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4997 goto error_unlock;
4998 }
4999 }
5000error_unlock:
d2956687 5001end:
bbc4768c
JG
5002 /*
5003 * Release the reference returned by the "find" operation and
5004 * the session daemon's implicit reference to the chunk.
5005 */
5006 lttng_trace_chunk_put(chunk);
5007 lttng_trace_chunk_put(chunk);
5008
d2956687 5009 return ret_code;
a1ae2ea5 5010}
3654ed19 5011
28ab034a
JG
5012enum lttcomm_return_code
5013lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
3654ed19 5014{
c35f9726 5015 int ret;
d2956687 5016 enum lttcomm_return_code ret_code;
d2956687
JG
5017 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5018 const char *relayd_id_str = "(none)";
c35f9726 5019 const bool is_local_trace = !relayd_id;
cd9adb8b 5020 struct consumer_relayd_sock_pair *relayd = nullptr;
6b584c2e 5021 bool chunk_exists_local, chunk_exists_remote;
56047f5a 5022 lttng::urcu::read_lock_guard read_lock;
d2956687
JG
5023
5024 if (relayd_id) {
d2956687 5025 /* Only used for logging purposes. */
28ab034a 5026 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
5027 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5028 relayd_id_str = relayd_id_buffer;
5029 } else {
5030 relayd_id_str = "(formatting error)";
5031 }
d01ef216 5032 }
d2956687
JG
5033
5034 DBG("Consumer trace chunk exists command: relayd_id = %s"
28ab034a
JG
5035 ", chunk_id = %" PRIu64,
5036 relayd_id_str,
5037 chunk_id);
6b584c2e 5038 ret = lttng_trace_chunk_registry_chunk_exists(
28ab034a 5039 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
6b584c2e
JG
5040 if (ret) {
5041 /* Internal error. */
5042 ERR("Failed to query the existence of a trace chunk");
5043 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 5044 goto end;
6b584c2e 5045 }
28ab034a 5046 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
6b584c2e 5047 if (chunk_exists_local) {
c35f9726 5048 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
5049 goto end;
5050 } else if (is_local_trace) {
5051 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5052 goto end;
5053 }
5054
c35f9726
JG
5055 relayd = consumer_find_relayd(*relayd_id);
5056 if (!relayd) {
5057 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5058 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5059 goto end_rcu_unlock;
5060 }
5061 DBG("Looking up existence of trace chunk on relay daemon");
5062 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5063 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
c35f9726
JG
5064 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5065 if (ret < 0) {
5066 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5067 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5068 goto end_rcu_unlock;
5069 }
5070
28ab034a
JG
5071 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5072 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5073 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
d2956687 5074
c35f9726 5075end_rcu_unlock:
c35f9726 5076end:
d2956687 5077 return ret_code;
3654ed19 5078}
5f3aff8b 5079
28ab034a 5080static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
5081{
5082 struct lttng_ht *ht;
5083 struct lttng_consumer_stream *stream;
5084 struct lttng_ht_iter iter;
5085 int ret;
5086
fa29bfbf 5087 ht = the_consumer_data.stream_per_chan_id_ht;
5f3aff8b 5088
56047f5a 5089 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 5090 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
5091 ht->hash_fct(&channel->key, lttng_ht_seed),
5092 ht->match_fct,
5093 &channel->key,
5094 &iter.iter,
5095 stream,
5096 node_channel_id.node)
5097 {
5f3aff8b
MD
5098 /*
5099 * Protect against teardown with mutex.
5100 */
5101 pthread_mutex_lock(&stream->lock);
5102 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5103 goto next;
5104 }
5105 ret = consumer_clear_stream(stream);
5106 if (ret) {
5107 goto error_unlock;
5108 }
5109 next:
5110 pthread_mutex_unlock(&stream->lock);
5111 }
5f3aff8b
MD
5112 return LTTCOMM_CONSUMERD_SUCCESS;
5113
5114error_unlock:
5115 pthread_mutex_unlock(&stream->lock);
5f3aff8b
MD
5116 return ret;
5117}
5118
5119int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5120{
5121 int ret;
5122
5123 DBG("Consumer clear channel %" PRIu64, channel->key);
5124
5125 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5126 /*
5127 * Nothing to do for the metadata channel/stream.
5128 * Snapshot mechanism already take care of the metadata
5129 * handling/generation, and monitored channels only need to
5130 * have their data stream cleared..
5131 */
5132 ret = LTTCOMM_CONSUMERD_SUCCESS;
5133 goto end;
5134 }
5135
5136 if (!channel->monitor) {
5137 ret = consumer_clear_unmonitored_channel(channel);
5138 } else {
5139 ret = consumer_clear_monitored_channel(channel);
5140 }
5141end:
5142 return ret;
5143}
04ed9e10 5144
28ab034a 5145enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
04ed9e10
JG
5146{
5147 struct lttng_consumer_stream *stream;
5148 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5149
5150 if (channel->metadata_stream) {
5151 ERR("Open channel packets command attempted on a metadata channel");
5152 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5153 goto end;
5154 }
5155
56047f5a
JG
5156 {
5157 lttng::urcu::read_lock_guard read_lock;
5158 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5159 enum consumer_stream_open_packet_status status;
04ed9e10 5160
56047f5a
JG
5161 pthread_mutex_lock(&stream->lock);
5162 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5163 goto next;
5164 }
04ed9e10 5165
56047f5a
JG
5166 status = consumer_stream_open_packet(stream);
5167 switch (status) {
5168 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5169 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5170 ", channel name = %s, session id = %" PRIu64,
5171 stream->key,
5172 stream->chan->name,
5173 stream->chan->session_id);
5174 stream->opened_packet_in_current_trace_chunk = true;
5175 break;
5176 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5177 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5178 ", channel name = %s, session id = %" PRIu64,
5179 stream->key,
5180 stream->chan->name,
5181 stream->chan->session_id);
5182 break;
5183 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5184 /*
5185 * Only unexpected internal errors can lead to this
5186 * failing. Report an unknown error.
5187 */
5188 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5189 ", channel id = %" PRIu64 ", channel name = %s"
5190 ", session id = %" PRIu64,
5191 stream->key,
5192 channel->key,
5193 channel->name,
5194 channel->session_id);
5195 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5196 goto error_unlock;
5197 default:
5198 abort();
5199 }
04ed9e10 5200
56047f5a
JG
5201 next:
5202 pthread_mutex_unlock(&stream->lock);
5203 }
04ed9e10 5204 }
04ed9e10 5205end_rcu_unlock:
04ed9e10
JG
5206end:
5207 return ret;
5208
5209error_unlock:
5210 pthread_mutex_unlock(&stream->lock);
5211 goto end_rcu_unlock;
5212}
881fc67f
MD
5213
5214void lttng_consumer_sigbus_handle(void *addr)
5215{
5216 lttng_ustconsumer_sigbus_handle(addr);
5217}
This page took 0.462333 seconds and 5 git commands to generate.