Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / common / consumer / consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
c9e313bc
SM
11#include <common/align.hpp>
12#include <common/common.hpp>
13#include <common/compat/endian.hpp>
14#include <common/compat/poll.hpp>
15#include <common/consumer/consumer-metadata-cache.hpp>
16#include <common/consumer/consumer-stream.hpp>
17#include <common/consumer/consumer-testpoint.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/dynamic-array.hpp>
21#include <common/index/ctf-index.hpp>
22#include <common/index/index.hpp>
671e39d7 23#include <common/io-hint.hpp>
c9e313bc
SM
24#include <common/kernel-consumer/kernel-consumer.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
3d46ea1a 26#include <common/pthread-lock.hpp>
c9e313bc
SM
27#include <common/relayd/relayd.hpp>
28#include <common/sessiond-comm/relayd.hpp>
29#include <common/sessiond-comm/sessiond-comm.hpp>
30#include <common/string-utils/format.hpp>
31#include <common/time.hpp>
32#include <common/trace-chunk-registry.hpp>
33#include <common/trace-chunk.hpp>
56047f5a 34#include <common/urcu.hpp>
c9e313bc
SM
35#include <common/ust-consumer/ust-consumer.hpp>
36#include <common/utils.hpp>
3bd1e081 37
28ab034a 38#include <bin/lttng-consumerd/health-consumerd.hpp>
671e39d7 39#include <fcntl.h>
28ab034a
JG
40#include <inttypes.h>
41#include <poll.h>
42#include <pthread.h>
43#include <signal.h>
44#include <stdlib.h>
45#include <string.h>
46#include <sys/mman.h>
47#include <sys/socket.h>
48#include <sys/types.h>
fbd566c2 49#include <type_traits>
28ab034a
JG
50#include <unistd.h>
51
97535efa 52lttng_consumer_global_data the_consumer_data;
3bd1e081 53
d8ef542d
MD
54enum consumer_channel_action {
55 CONSUMER_CHANNEL_ADD,
a0cbdd2e 56 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
57 CONSUMER_CHANNEL_QUIT,
58};
59
f1494934 60namespace {
d8ef542d
MD
61struct consumer_channel_msg {
62 enum consumer_channel_action action;
28ab034a
JG
63 struct lttng_consumer_channel *chan; /* add */
64 uint64_t key; /* del */
d8ef542d
MD
65};
66
f1494934
JG
67/*
68 * Global hash table containing respectively metadata and data streams. The
69 * stream element in this ht should only be updated by the metadata poll thread
70 * for the metadata and the data poll thread for the data.
71 */
72struct lttng_ht *metadata_ht;
73struct lttng_ht *data_ht;
74} /* namespace */
75
80957876 76/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
77int data_consumption_paused;
78
3bd1e081
MD
79/*
80 * Flag to inform the polling thread to quit when all fd hung up. Updated by
81 * the consumer_thread_receive_fds when it notices that all fds has hung up.
82 * Also updated by the signal handler (consumer_should_exit()). Read by the
83 * polling threads.
84 */
10211f5c 85int consumer_quit;
3bd1e081 86
cd9adb8b 87static const char *get_consumer_domain()
5da88b0f 88{
fa29bfbf 89 switch (the_consumer_data.type) {
5da88b0f
MD
90 case LTTNG_CONSUMER_KERNEL:
91 return DEFAULT_KERNEL_TRACE_DIR;
92 case LTTNG_CONSUMER64_UST:
93 /* Fall-through. */
94 case LTTNG_CONSUMER32_UST:
95 return DEFAULT_UST_TRACE_DIR;
96 default:
97 abort();
98 }
99}
100
acdb9057
DG
101/*
102 * Notify a thread lttng pipe to poll back again. This usually means that some
103 * global state has changed so we just send back the thread in a poll wait
104 * call.
105 */
106static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
107{
cd9adb8b 108 struct lttng_consumer_stream *null_stream = nullptr;
acdb9057 109
a0377dfe 110 LTTNG_ASSERT(pipe);
acdb9057 111
5c7248cd
JG
112 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
113 pointer. */
acdb9057
DG
114}
115
5c635c72
MD
116static void notify_health_quit_pipe(int *pipe)
117{
6cd525e8 118 ssize_t ret;
5c635c72 119
6cd525e8
MD
120 ret = lttng_write(pipe[1], "4", 1);
121 if (ret < 1) {
5c635c72
MD
122 PERROR("write consumer health quit");
123 }
124}
125
d8ef542d 126static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
127 struct lttng_consumer_channel *chan,
128 uint64_t key,
129 enum consumer_channel_action action)
d8ef542d
MD
130{
131 struct consumer_channel_msg msg;
6cd525e8 132 ssize_t ret;
d8ef542d 133
e56251fc
DG
134 memset(&msg, 0, sizeof(msg));
135
d8ef542d
MD
136 msg.action = action;
137 msg.chan = chan;
f21dae48 138 msg.key = key;
6cd525e8
MD
139 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
140 if (ret < sizeof(msg)) {
141 PERROR("notify_channel_pipe write error");
142 }
d8ef542d
MD
143}
144
28ab034a 145void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
a0cbdd2e 146{
cd9adb8b 147 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
a0cbdd2e
MD
148}
149
d8ef542d 150static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
151 struct lttng_consumer_channel **chan,
152 uint64_t *key,
153 enum consumer_channel_action *action)
d8ef542d
MD
154{
155 struct consumer_channel_msg msg;
6cd525e8 156 ssize_t ret;
d8ef542d 157
6cd525e8
MD
158 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
159 if (ret < sizeof(msg)) {
160 ret = -1;
161 goto error;
d8ef542d 162 }
6cd525e8
MD
163 *action = msg.action;
164 *chan = msg.chan;
165 *key = msg.key;
166error:
167 return (int) ret;
d8ef542d
MD
168}
169
212d67a2
DG
170/*
171 * Cleanup the stream list of a channel. Those streams are not yet globally
172 * visible
173 */
174static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
175{
a0377dfe 176 LTTNG_ASSERT(channel);
212d67a2
DG
177
178 /* Delete streams that might have been left in the stream list. */
a1a1df65
JG
179 for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
180 &lttng_consumer_stream::send_node>(
181 channel->streams.head)) {
cd9adb8b 182 consumer_stream_destroy(stream, nullptr);
212d67a2
DG
183 }
184}
185
3bd1e081
MD
186/*
187 * Find a stream. The consumer_data.lock must be locked during this
188 * call.
189 */
28ab034a 190static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
3bd1e081 191{
e4421fec 192 struct lttng_ht_iter iter;
d88aee68 193 struct lttng_ht_node_u64 *node;
cd9adb8b 194 struct lttng_consumer_stream *stream = nullptr;
3bd1e081 195
a0377dfe 196 LTTNG_ASSERT(ht);
8389e4f8 197
d88aee68
DG
198 /* -1ULL keys are lookup failures */
199 if (key == (uint64_t) -1ULL) {
cd9adb8b 200 return nullptr;
7a57cf92 201 }
e4421fec 202
07c4863f 203 const lttng::urcu::read_lock_guard read_lock;
6065ceec 204
d88aee68 205 lttng_ht_lookup(ht, &key, &iter);
00d7d903 206 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
cd9adb8b 207 if (node != nullptr) {
0114db0e 208 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
3bd1e081 209 }
e4421fec
DG
210
211 return stream;
3bd1e081
MD
212}
213
da009f2c 214static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
215{
216 struct lttng_consumer_stream *stream;
217
07c4863f 218 const lttng::urcu::read_lock_guard read_lock;
ffe60014 219 stream = find_stream(key, ht);
04253271 220 if (stream) {
da009f2c 221 stream->key = (uint64_t) -1ULL;
04253271
MD
222 /*
223 * We don't want the lookup to match, but we still need
224 * to iterate on this stream when iterating over the hash table. Just
225 * change the node key.
226 */
da009f2c 227 stream->node.key = (uint64_t) -1ULL;
04253271 228 }
7ad0a0cb
MD
229}
230
d56db448
DG
231/*
232 * Return a channel object for the given key.
233 *
234 * RCU read side lock MUST be acquired before calling this function and
235 * protects the channel ptr.
236 */
d88aee68 237struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 238{
e4421fec 239 struct lttng_ht_iter iter;
d88aee68 240 struct lttng_ht_node_u64 *node;
cd9adb8b 241 struct lttng_consumer_channel *channel = nullptr;
3bd1e081 242
48b7cdc2
FD
243 ASSERT_RCU_READ_LOCKED();
244
d88aee68
DG
245 /* -1ULL keys are lookup failures */
246 if (key == (uint64_t) -1ULL) {
cd9adb8b 247 return nullptr;
7a57cf92 248 }
e4421fec 249
fa29bfbf 250 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
00d7d903 251 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
cd9adb8b 252 if (node != nullptr) {
0114db0e 253 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
3bd1e081 254 }
e4421fec
DG
255
256 return channel;
3bd1e081
MD
257}
258
b5a6470f
DG
259/*
260 * There is a possibility that the consumer does not have enough time between
261 * the close of the channel on the session daemon and the cleanup in here thus
262 * once we have a channel add with an existing key, we know for sure that this
263 * channel will eventually get cleaned up by all streams being closed.
264 *
265 * This function just nullifies the already existing channel key.
266 */
267static void steal_channel_key(uint64_t key)
268{
269 struct lttng_consumer_channel *channel;
270
07c4863f 271 const lttng::urcu::read_lock_guard read_lock;
b5a6470f
DG
272 channel = consumer_find_channel(key);
273 if (channel) {
274 channel->key = (uint64_t) -1ULL;
275 /*
276 * We don't want the lookup to match, but we still need to iterate on
277 * this channel when iterating over the hash table. Just change the
278 * node key.
279 */
280 channel->node.key = (uint64_t) -1ULL;
281 }
b5a6470f
DG
282}
283
ffe60014 284static void free_channel_rcu(struct rcu_head *head)
702b1ea4 285{
28ab034a 286 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
ffe60014 287 struct lttng_consumer_channel *channel =
0114db0e 288 lttng::utils::container_of(node, &lttng_consumer_channel::node);
702b1ea4 289
fa29bfbf 290 switch (the_consumer_data.type) {
b83e03c4
MD
291 case LTTNG_CONSUMER_KERNEL:
292 break;
293 case LTTNG_CONSUMER32_UST:
294 case LTTNG_CONSUMER64_UST:
295 lttng_ustconsumer_free_channel(channel);
296 break;
297 default:
298 ERR("Unknown consumer_data type");
299 abort();
300 }
32670d71
JG
301
302 delete channel;
702b1ea4
MD
303}
304
00e2e675
DG
305/*
306 * RCU protected relayd socket pair free.
307 */
ffe60014 308static void free_relayd_rcu(struct rcu_head *head)
00e2e675 309{
28ab034a 310 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
00e2e675 311 struct consumer_relayd_sock_pair *relayd =
0114db0e 312 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675 313
8994307f
DG
314 /*
315 * Close all sockets. This is done in the call RCU since we don't want the
316 * socket fds to be reassigned thus potentially creating bad state of the
317 * relayd object.
318 *
319 * We do not have to lock the control socket mutex here since at this stage
320 * there is no one referencing to this relayd object.
321 */
322 (void) relayd_close(&relayd->control_sock);
323 (void) relayd_close(&relayd->data_sock);
324
3a84e2f3 325 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
326 free(relayd);
327}
328
329/*
330 * Destroy and free relayd socket pair object.
00e2e675 331 */
51230d70 332void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
333{
334 int ret;
335 struct lttng_ht_iter iter;
336
cd9adb8b 337 if (relayd == nullptr) {
173af62f
DG
338 return;
339 }
340
00e2e675
DG
341 DBG("Consumer destroy and close relayd socket pair");
342
343 iter.iter.node = &relayd->node.node;
fa29bfbf 344 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
173af62f 345 if (ret != 0) {
8994307f 346 /* We assume the relayd is being or is destroyed */
173af62f
DG
347 return;
348 }
00e2e675 349
00e2e675 350 /* RCU free() call */
ffe60014
DG
351 call_rcu(&relayd->node.head, free_relayd_rcu);
352}
353
354/*
355 * Remove a channel from the global list protected by a mutex. This function is
356 * also responsible for freeing its data structures.
357 */
358void consumer_del_channel(struct lttng_consumer_channel *channel)
359{
ffe60014
DG
360 struct lttng_ht_iter iter;
361
d88aee68 362 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014 363
fa29bfbf 364 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 365 pthread_mutex_lock(&channel->lock);
ffe60014 366
212d67a2
DG
367 /* Destroy streams that might have been left in the stream list. */
368 clean_channel_stream_list(channel);
51e762e5 369
d3e2ba59
JD
370 if (channel->live_timer_enabled == 1) {
371 consumer_timer_live_stop(channel);
372 }
e9404c27
JG
373 if (channel->monitor_timer_enabled == 1) {
374 consumer_timer_monitor_stop(channel);
375 }
d3e2ba59 376
319dcddc
JG
377 /*
378 * Send a last buffer statistics sample to the session daemon
379 * to ensure it tracks the amount of data consumed by this channel.
380 */
381 sample_and_send_channel_buffer_stats(channel);
382
fa29bfbf 383 switch (the_consumer_data.type) {
ffe60014
DG
384 case LTTNG_CONSUMER_KERNEL:
385 break;
386 case LTTNG_CONSUMER32_UST:
387 case LTTNG_CONSUMER64_UST:
388 lttng_ustconsumer_del_channel(channel);
389 break;
390 default:
391 ERR("Unknown consumer_data type");
a0377dfe 392 abort();
ffe60014
DG
393 goto end;
394 }
395
d2956687 396 lttng_trace_chunk_put(channel->trace_chunk);
cd9adb8b 397 channel->trace_chunk = nullptr;
5c3892a6 398
d2956687
JG
399 if (channel->is_published) {
400 int ret;
401
07c4863f 402 const lttng::urcu::read_lock_guard read_lock;
d2956687 403 iter.iter.node = &channel->node.node;
fa29bfbf 404 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
a0377dfe 405 LTTNG_ASSERT(!ret);
ffe60014 406
d2956687 407 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
28ab034a 408 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
a0377dfe 409 LTTNG_ASSERT(!ret);
d2956687
JG
410 }
411
b6921a17
JG
412 channel->is_deleted = true;
413 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 414end:
a9838785 415 pthread_mutex_unlock(&channel->lock);
fa29bfbf 416 pthread_mutex_unlock(&the_consumer_data.lock);
00e2e675
DG
417}
418
228b5bf7
DG
419/*
420 * Iterate over the relayd hash table and destroy each element. Finally,
421 * destroy the whole hash table.
422 */
cd9adb8b 423static void cleanup_relayd_ht()
228b5bf7 424{
c3ade133
JG
425 for (auto *relayd :
426 lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
427 decltype(consumer_relayd_sock_pair::node),
428 &consumer_relayd_sock_pair::node>(
429 *the_consumer_data.relayd_ht->ht)) {
430 consumer_destroy_relayd(relayd);
228b5bf7
DG
431 }
432
fa29bfbf 433 lttng_ht_destroy(the_consumer_data.relayd_ht);
228b5bf7
DG
434}
435
8994307f
DG
436/*
437 * Update the end point status of all streams having the given network sequence
438 * index (relayd index).
439 *
440 * It's atomically set without having the stream mutex locked which is fine
441 * because we handle the write/read race with a pipe wakeup for each thread.
442 */
da009f2c 443static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
28ab034a 444 enum consumer_endpoint_status status)
8994307f 445{
da009f2c 446 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f 447
8994307f 448 /* Let's begin with metadata */
c3ade133
JG
449 for (auto *stream :
450 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
451 decltype(lttng_consumer_stream::node),
452 &lttng_consumer_stream::node>(*metadata_ht->ht)) {
8994307f
DG
453 if (stream->net_seq_idx == net_seq_idx) {
454 uatomic_set(&stream->endpoint_status, status);
32670d71 455 stream->chan->metadata_pushed_wait_queue.wake_all();
f40b76ae 456
8994307f
DG
457 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
458 }
459 }
460
461 /* Follow up by the data streams */
c3ade133
JG
462 for (auto *stream :
463 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
464 decltype(lttng_consumer_stream::node),
465 &lttng_consumer_stream::node>(*data_ht->ht)) {
8994307f
DG
466 if (stream->net_seq_idx == net_seq_idx) {
467 uatomic_set(&stream->endpoint_status, status);
468 DBG("Delete flag set to data stream %d", stream->wait_fd);
469 }
470 }
8994307f
DG
471}
472
473/*
474 * Cleanup a relayd object by flagging every associated streams for deletion,
475 * destroying the object meaning removing it from the relayd hash table,
476 * closing the sockets and freeing the memory in a RCU call.
477 *
478 * If a local data context is available, notify the threads that the streams'
479 * state have changed.
480 */
9276e5c8 481void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 482{
da009f2c 483 uint64_t netidx;
8994307f 484
a0377dfe 485 LTTNG_ASSERT(relayd);
8994307f 486
97535efa 487 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
9617607b 488
8994307f
DG
489 /* Save the net sequence index before destroying the object */
490 netidx = relayd->net_seq_idx;
491
492 /*
493 * Delete the relayd from the relayd hash table, close the sockets and free
494 * the object in a RCU call.
495 */
51230d70 496 consumer_destroy_relayd(relayd);
8994307f
DG
497
498 /* Set inactive endpoint to all streams */
499 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
500
501 /*
502 * With a local data context, notify the threads that the streams' state
503 * have changed. The write() action on the pipe acts as an "implicit"
504 * memory barrier ordering the updates of the end point status from the
505 * read of this status which happens AFTER receiving this notify.
506 */
9276e5c8
JR
507 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
508 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
509}
510
a6ba4fe1
DG
511/*
512 * Flag a relayd socket pair for destruction. Destroy it if the refcount
513 * reaches zero.
514 *
515 * RCU read side lock MUST be aquired before calling this function.
516 */
517void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
518{
a0377dfe 519 LTTNG_ASSERT(relayd);
48b7cdc2 520 ASSERT_RCU_READ_LOCKED();
a6ba4fe1
DG
521
522 /* Set destroy flag for this object */
523 uatomic_set(&relayd->destroy_flag, 1);
524
525 /* Destroy the relayd if refcount is 0 */
526 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 527 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
528 }
529}
530
3bd1e081 531/*
1d1a276c
DG
532 * Completly destroy stream from every visiable data structure and the given
533 * hash table if one.
534 *
535 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 536 */
28ab034a 537void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
3bd1e081 538{
1d1a276c 539 consumer_stream_destroy(stream, ht);
3bd1e081
MD
540}
541
5ab66908
MD
542/*
543 * XXX naming of del vs destroy is all mixed up.
544 */
545void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
546{
547 consumer_stream_destroy(stream, data_ht);
548}
549
550void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
551{
552 consumer_stream_destroy(stream, metadata_ht);
553}
554
28ab034a
JG
555void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
556 struct lttng_consumer_channel *channel)
d9a2e16e 557{
28ab034a 558 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
d9a2e16e
JD
559}
560
3bd1e081
MD
561/*
562 * Add a stream to the global list protected by a mutex.
563 */
66d583dc 564void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 565{
5ab66908 566 struct lttng_ht *ht = data_ht;
3bd1e081 567
a0377dfe
FD
568 LTTNG_ASSERT(stream);
569 LTTNG_ASSERT(ht);
c77fc10a 570
d88aee68 571 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5 572
fa29bfbf 573 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 574 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 575 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 576 pthread_mutex_lock(&stream->lock);
07c4863f 577 const lttng::urcu::read_lock_guard read_lock;
e316aad5 578
43c34bc3 579 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 580 steal_stream_key(stream->key, ht);
43c34bc3 581
d88aee68 582 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 583
28ab034a 584 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 585
ca22feea
DG
586 /*
587 * Add stream to the stream_list_ht of the consumer data. No need to steal
588 * the key since the HT does not use it and we allow to add redundant keys
589 * into this table.
590 */
28ab034a 591 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 592
e316aad5 593 /*
ffe60014
DG
594 * When nb_init_stream_left reaches 0, we don't need to trigger any action
595 * in terms of destroying the associated channel, because the action that
e316aad5
DG
596 * causes the count to become 0 also causes a stream to be added. The
597 * channel deletion will thus be triggered by the following removal of this
598 * stream.
599 */
ffe60014 600 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
601 /* Increment refcount before decrementing nb_init_stream_left */
602 cmm_smp_wmb();
ffe60014 603 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
604 }
605
606 /* Update consumer data once the node is inserted. */
fa29bfbf
SM
607 the_consumer_data.stream_count++;
608 the_consumer_data.need_update = 1;
3bd1e081 609
2e818a6a 610 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 611 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 612 pthread_mutex_unlock(&stream->chan->lock);
fa29bfbf 613 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
614}
615
00e2e675 616/*
3f8e211f
DG
617 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
618 * be acquired before calling this.
00e2e675 619 */
d09e1200 620static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675 621{
07c4863f 622 const int ret = 0;
d88aee68 623 struct lttng_ht_node_u64 *node;
00e2e675
DG
624 struct lttng_ht_iter iter;
625
a0377dfe 626 LTTNG_ASSERT(relayd);
48b7cdc2 627 ASSERT_RCU_READ_LOCKED();
00e2e675 628
28ab034a 629 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
00d7d903 630 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
cd9adb8b 631 if (node != nullptr) {
00e2e675
DG
632 goto end;
633 }
fa29bfbf 634 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
00e2e675 635
00e2e675
DG
636end:
637 return ret;
638}
639
640/*
641 * Allocate and return a consumer relayd socket.
642 */
28ab034a 643static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
00e2e675 644{
cd9adb8b 645 struct consumer_relayd_sock_pair *obj = nullptr;
00e2e675 646
da009f2c
MD
647 /* net sequence index of -1 is a failure */
648 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
649 goto error;
650 }
651
64803277 652 obj = zmalloc<consumer_relayd_sock_pair>();
cd9adb8b 653 if (obj == nullptr) {
00e2e675
DG
654 PERROR("zmalloc relayd sock");
655 goto error;
656 }
657
658 obj->net_seq_idx = net_seq_idx;
659 obj->refcount = 0;
173af62f 660 obj->destroy_flag = 0;
f96e4545
MD
661 obj->control_sock.sock.fd = -1;
662 obj->data_sock.sock.fd = -1;
d88aee68 663 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
cd9adb8b 664 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
00e2e675
DG
665
666error:
667 return obj;
668}
669
670/*
671 * Find a relayd socket pair in the global consumer data.
672 *
673 * Return the object if found else NULL.
b0b335c8
MD
674 * RCU read-side lock must be held across this call and while using the
675 * returned object.
00e2e675 676 */
d88aee68 677struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
678{
679 struct lttng_ht_iter iter;
d88aee68 680 struct lttng_ht_node_u64 *node;
cd9adb8b 681 struct consumer_relayd_sock_pair *relayd = nullptr;
00e2e675 682
48b7cdc2
FD
683 ASSERT_RCU_READ_LOCKED();
684
00e2e675 685 /* Negative keys are lookup failures */
d88aee68 686 if (key == (uint64_t) -1ULL) {
00e2e675
DG
687 goto error;
688 }
689
fa29bfbf 690 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
00d7d903 691 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
cd9adb8b 692 if (node != nullptr) {
0114db0e 693 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675
DG
694 }
695
00e2e675
DG
696error:
697 return relayd;
698}
699
10a50311
JD
700/*
701 * Find a relayd and send the stream
702 *
703 * Returns 0 on success, < 0 on error
704 */
28ab034a 705int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
10a50311
JD
706{
707 int ret = 0;
708 struct consumer_relayd_sock_pair *relayd;
709
a0377dfe
FD
710 LTTNG_ASSERT(stream);
711 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
712 LTTNG_ASSERT(path);
10a50311
JD
713
714 /* The stream is not metadata. Get relayd reference if exists. */
07c4863f 715 const lttng::urcu::read_lock_guard read_lock;
10a50311 716 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 717 if (relayd != nullptr) {
10a50311
JD
718 /* Add stream on the relayd */
719 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
720 ret = relayd_add_stream(&relayd->control_sock,
721 stream->name,
722 get_consumer_domain(),
723 path,
724 &stream->relayd_stream_id,
725 stream->chan->tracefile_size,
726 stream->chan->tracefile_count,
727 stream->trace_chunk);
10a50311
JD
728 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
729 if (ret < 0) {
28ab034a
JG
730 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
731 relayd->net_seq_idx);
9276e5c8 732 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
733 goto end;
734 }
1c20f0e2 735
10a50311 736 uatomic_inc(&relayd->refcount);
d01178b6 737 stream->sent_to_relayd = 1;
10a50311
JD
738 } else {
739 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
28ab034a
JG
740 stream->key,
741 stream->net_seq_idx);
10a50311
JD
742 ret = -1;
743 goto end;
744 }
745
746 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
28ab034a
JG
747 stream->name,
748 stream->key,
749 stream->net_seq_idx);
10a50311
JD
750
751end:
10a50311
JD
752 return ret;
753}
754
a4baae1b
JD
755/*
756 * Find a relayd and send the streams sent message
757 *
758 * Returns 0 on success, < 0 on error
759 */
760int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
761{
762 int ret = 0;
763 struct consumer_relayd_sock_pair *relayd;
764
a0377dfe 765 LTTNG_ASSERT(net_seq_idx != -1ULL);
a4baae1b
JD
766
767 /* The stream is not metadata. Get relayd reference if exists. */
07c4863f 768 const lttng::urcu::read_lock_guard read_lock;
a4baae1b 769 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 770 if (relayd != nullptr) {
a4baae1b
JD
771 /* Add stream on the relayd */
772 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
773 ret = relayd_streams_sent(&relayd->control_sock);
774 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
775 if (ret < 0) {
28ab034a
JG
776 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
777 relayd->net_seq_idx);
9276e5c8 778 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
779 goto end;
780 }
781 } else {
28ab034a 782 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
a4baae1b
JD
783 ret = -1;
784 goto end;
785 }
786
787 ret = 0;
788 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
789
790end:
a4baae1b
JD
791 return ret;
792}
793
10a50311
JD
794/*
795 * Find a relayd and close the stream
796 */
797void close_relayd_stream(struct lttng_consumer_stream *stream)
798{
799 struct consumer_relayd_sock_pair *relayd;
800
801 /* The stream is not metadata. Get relayd reference if exists. */
07c4863f 802 const lttng::urcu::read_lock_guard read_lock;
10a50311
JD
803 relayd = consumer_find_relayd(stream->net_seq_idx);
804 if (relayd) {
805 consumer_stream_relayd_close(stream, relayd);
806 }
10a50311
JD
807}
808
00e2e675
DG
809/*
810 * Handle stream for relayd transmission if the stream applies for network
811 * streaming where the net sequence index is set.
812 *
813 * Return destination file descriptor or negative value on error.
814 */
6197aea7 815static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
28ab034a
JG
816 size_t data_size,
817 unsigned long padding,
818 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
819{
820 int outfd = -1, ret;
00e2e675
DG
821 struct lttcomm_relayd_data_hdr data_hdr;
822
823 /* Safety net */
a0377dfe
FD
824 LTTNG_ASSERT(stream);
825 LTTNG_ASSERT(relayd);
00e2e675
DG
826
827 /* Reset data header */
828 memset(&data_hdr, 0, sizeof(data_hdr));
829
00e2e675
DG
830 if (stream->metadata_flag) {
831 /* Caller MUST acquire the relayd control socket lock */
832 ret = relayd_send_metadata(&relayd->control_sock, data_size);
833 if (ret < 0) {
834 goto error;
835 }
836
837 /* Metadata are always sent on the control socket. */
6151a90f 838 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
839 } else {
840 /* Set header with stream information */
841 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
842 data_hdr.data_size = htobe32(data_size);
1d4dfdef 843 data_hdr.padding_size = htobe32(padding);
c35f9726 844
39df6d9f
DG
845 /*
846 * Note that net_seq_num below is assigned with the *current* value of
847 * next_net_seq_num and only after that the next_net_seq_num will be
848 * increment. This is why when issuing a command on the relayd using
849 * this next value, 1 should always be substracted in order to compare
850 * the last seen sequence number on the relayd side to the last sent.
851 */
3604f373 852 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
853 /* Other fields are zeroed previously */
854
28ab034a 855 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
00e2e675
DG
856 if (ret < 0) {
857 goto error;
858 }
859
3604f373
DG
860 ++stream->next_net_seq_num;
861
00e2e675 862 /* Set to go on data socket */
6151a90f 863 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
864 }
865
866error:
867 return outfd;
868}
869
b1316da1
JG
870/*
871 * Write a character on the metadata poll pipe to wake the metadata thread.
872 * Returns 0 on success, -1 on error.
873 */
874int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
875{
876 int ret = 0;
877
28ab034a 878 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
b1316da1
JG
879 if (channel->monitor && channel->metadata_stream) {
880 const char dummy = 'c';
28ab034a
JG
881 const ssize_t write_ret =
882 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
b1316da1
JG
883
884 if (write_ret < 1) {
885 if (errno == EWOULDBLOCK) {
886 /*
887 * This is fine, the metadata poll thread
888 * is having a hard time keeping-up, but
889 * it will eventually wake-up and consume
890 * the available data.
891 */
892 ret = 0;
893 } else {
894 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
895 ret = -1;
896 goto end;
897 }
898 }
899 }
900
901end:
902 return ret;
903}
904
d2956687
JG
905/*
906 * Trigger a dump of the metadata content. Following/during the succesful
907 * completion of this call, the metadata poll thread will start receiving
908 * metadata packets to consume.
909 *
910 * The caller must hold the channel and stream locks.
911 */
28ab034a 912static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
d2956687
JG
913{
914 int ret;
915
916 ASSERT_LOCKED(stream->chan->lock);
917 ASSERT_LOCKED(stream->lock);
a0377dfe
FD
918 LTTNG_ASSERT(stream->metadata_flag);
919 LTTNG_ASSERT(stream->chan->trace_chunk);
d2956687 920
fa29bfbf 921 switch (the_consumer_data.type) {
d2956687
JG
922 case LTTNG_CONSUMER_KERNEL:
923 /*
924 * Reset the position of what has been read from the
925 * metadata cache to 0 so we can dump it again.
926 */
927 ret = kernctl_metadata_cache_dump(stream->wait_fd);
928 break;
929 case LTTNG_CONSUMER32_UST:
930 case LTTNG_CONSUMER64_UST:
931 /*
932 * Reset the position pushed from the metadata cache so it
933 * will write from the beginning on the next push.
934 */
935 stream->ust_metadata_pushed = 0;
936 ret = consumer_metadata_wakeup_pipe(stream->chan);
937 break;
938 default:
939 ERR("Unknown consumer_data type");
940 abort();
941 }
942 if (ret < 0) {
943 ERR("Failed to dump the metadata cache");
944 }
945 return ret;
946}
947
28ab034a
JG
948static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
949 struct lttng_trace_chunk *new_trace_chunk)
d2956687 950{
d2956687 951 pthread_mutex_lock(&channel->lock);
b6921a17
JG
952 if (channel->is_deleted) {
953 /*
954 * The channel has been logically deleted and should no longer
955 * be used. It has released its reference to its current trace
956 * chunk and should not acquire a new one.
957 *
958 * Return success as there is nothing for the caller to do.
959 */
960 goto end;
961 }
d2956687
JG
962
963 /*
964 * The acquisition of the reference cannot fail (barring
965 * a severe internal error) since a reference to the published
966 * chunk is already held by the caller.
967 */
968 if (new_trace_chunk) {
28ab034a 969 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
d2956687 970
a0377dfe 971 LTTNG_ASSERT(acquired_reference);
d2956687
JG
972 }
973
974 lttng_trace_chunk_put(channel->trace_chunk);
975 channel->trace_chunk = new_trace_chunk;
d2956687
JG
976end:
977 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 978 return 0;
d2956687
JG
979}
980
3bd1e081 981/*
ffe60014
DG
982 * Allocate and return a new lttng_consumer_channel object using the given key
983 * to initialize the hash table node.
984 *
985 * On error, return NULL.
3bd1e081 986 */
886224ff 987struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
28ab034a
JG
988 uint64_t session_id,
989 const uint64_t *chunk_id,
990 const char *pathname,
991 const char *name,
992 uint64_t relayd_id,
993 enum lttng_event_output output,
994 uint64_t tracefile_size,
995 uint64_t tracefile_count,
996 uint64_t session_id_per_pid,
997 unsigned int monitor,
998 unsigned int live_timer_interval,
999 bool is_in_live_session,
1000 const char *root_shm_path,
1001 const char *shm_path)
3bd1e081 1002{
cd9adb8b
JG
1003 struct lttng_consumer_channel *channel = nullptr;
1004 struct lttng_trace_chunk *trace_chunk = nullptr;
d2956687
JG
1005
1006 if (chunk_id) {
1007 trace_chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 1008 the_consumer_data.chunk_registry, session_id, *chunk_id);
d2956687
JG
1009 if (!trace_chunk) {
1010 ERR("Failed to find trace chunk reference during creation of channel");
1011 goto end;
1012 }
1013 }
3bd1e081 1014
32670d71
JG
1015 try {
1016 channel = new lttng_consumer_channel;
1017 } catch (const std::bad_alloc& e) {
1018 ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
1019 channel = nullptr;
3bd1e081
MD
1020 goto end;
1021 }
ffe60014
DG
1022
1023 channel->key = key;
3bd1e081 1024 channel->refcount = 0;
ffe60014 1025 channel->session_id = session_id;
1950109e 1026 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1027 channel->relayd_id = relayd_id;
1624d5b7
JD
1028 channel->tracefile_size = tracefile_size;
1029 channel->tracefile_count = tracefile_count;
2bba9e53 1030 channel->monitor = monitor;
ecc48a90 1031 channel->live_timer_interval = live_timer_interval;
a2814ea7 1032 channel->is_live = is_in_live_session;
07c4863f
JG
1033 pthread_mutex_init(&channel->lock, nullptr);
1034 pthread_mutex_init(&channel->timer_lock, nullptr);
ffe60014 1035
0c759fc9
DG
1036 switch (output) {
1037 case LTTNG_EVENT_SPLICE:
1038 channel->output = CONSUMER_CHANNEL_SPLICE;
1039 break;
1040 case LTTNG_EVENT_MMAP:
1041 channel->output = CONSUMER_CHANNEL_MMAP;
1042 break;
1043 default:
a0377dfe 1044 abort();
32670d71 1045 delete channel;
cd9adb8b 1046 channel = nullptr;
0c759fc9
DG
1047 goto end;
1048 }
1049
07b86b52
JD
1050 /*
1051 * In monitor mode, the streams associated with the channel will be put in
1052 * a special list ONLY owned by this channel. So, the refcount is set to 1
1053 * here meaning that the channel itself has streams that are referenced.
1054 *
1055 * On a channel deletion, once the channel is no longer visible, the
1056 * refcount is decremented and checked for a zero value to delete it. With
1057 * streams in no monitor mode, it will now be safe to destroy the channel.
1058 */
1059 if (!channel->monitor) {
1060 channel->refcount = 1;
1061 }
1062
ffe60014
DG
1063 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1064 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1065
1066 strncpy(channel->name, name, sizeof(channel->name));
1067 channel->name[sizeof(channel->name) - 1] = '\0';
1068
3d071855
MD
1069 if (root_shm_path) {
1070 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1071 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1072 }
d7ba1388
MD
1073 if (shm_path) {
1074 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1075 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1076 }
1077
d88aee68 1078 lttng_ht_node_init_u64(&channel->node, channel->key);
28ab034a 1079 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
d8ef542d
MD
1080
1081 channel->wait_fd = -1;
ffe60014
DG
1082 CDS_INIT_LIST_HEAD(&channel->streams.head);
1083
d2956687 1084 if (trace_chunk) {
07c4863f 1085 const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
d2956687
JG
1086 if (ret) {
1087 goto error;
1088 }
1089 }
1090
62a7b8ed 1091 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1092
3bd1e081 1093end:
d2956687 1094 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1095 return channel;
d2956687
JG
1096error:
1097 consumer_del_channel(channel);
cd9adb8b 1098 channel = nullptr;
d2956687 1099 goto end;
3bd1e081
MD
1100}
1101
1102/*
1103 * Add a channel to the global list protected by a mutex.
821fffb2 1104 *
b5a6470f 1105 * Always return 0 indicating success.
3bd1e081 1106 */
d8ef542d 1107int consumer_add_channel(struct lttng_consumer_channel *channel,
28ab034a 1108 struct lttng_consumer_local_data *ctx)
3bd1e081 1109{
fa29bfbf 1110 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 1111 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1112 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1113
b5a6470f
DG
1114 /*
1115 * This gives us a guarantee that the channel we are about to add to the
1116 * channel hash table will be unique. See this function comment on the why
1117 * we need to steel the channel key at this stage.
1118 */
1119 steal_channel_key(channel->key);
c77fc10a 1120
07c4863f 1121 const lttng::urcu::read_lock_guard read_lock;
fa29bfbf
SM
1122 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1123 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
28ab034a 1124 &channel->channels_by_session_id_ht_node);
d2956687 1125 channel->is_published = true;
b5a6470f 1126
ec6ea7d0 1127 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1128 pthread_mutex_unlock(&channel->lock);
fa29bfbf 1129 pthread_mutex_unlock(&the_consumer_data.lock);
702b1ea4 1130
b5a6470f 1131 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1132 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1133 }
b5a6470f
DG
1134
1135 return 0;
3bd1e081
MD
1136}
1137
1138/*
1139 * Allocate the pollfd structure and the local view of the out fds to avoid
1140 * doing a lookup in the linked list and concurrency issues when writing is
1141 * needed. Called with consumer_data.lock held.
1142 *
1143 * Returns the number of fds in the structures.
1144 */
ffe60014 1145static int update_poll_array(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1146 struct pollfd **pollfd,
1147 struct lttng_consumer_stream **local_stream,
1148 struct lttng_ht *ht,
1149 int *nb_inactive_fd)
3bd1e081 1150{
3bd1e081
MD
1151 int i = 0;
1152
a0377dfe
FD
1153 LTTNG_ASSERT(ctx);
1154 LTTNG_ASSERT(ht);
1155 LTTNG_ASSERT(pollfd);
1156 LTTNG_ASSERT(local_stream);
ffe60014 1157
3bd1e081 1158 DBG("Updating poll fd array");
9a2fcf78 1159 *nb_inactive_fd = 0;
56047f5a 1160
c3ade133
JG
1161 for (auto *stream :
1162 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
1163 decltype(lttng_consumer_stream::node),
1164 &lttng_consumer_stream::node>(*ht->ht)) {
1165 /*
1166 * Only active streams with an active end point can be added to the
1167 * poll set and local stream storage of the thread.
1168 *
1169 * There is a potential race here for endpoint_status to be updated
1170 * just after the check. However, this is OK since the stream(s) will
1171 * be deleted once the thread is notified that the end point state has
1172 * changed where this function will be called back again.
1173 *
1174 * We track the number of inactive FDs because they still need to be
1175 * closed by the polling thread after a wakeup on the data_pipe or
1176 * metadata_pipe.
1177 */
1178 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1179 (*nb_inactive_fd)++;
1180 continue;
3bd1e081 1181 }
c3ade133
JG
1182
1183 (*pollfd)[i].fd = stream->wait_fd;
1184 (*pollfd)[i].events = POLLIN | POLLPRI;
1185 local_stream[i] = stream;
1186 i++;
3bd1e081
MD
1187 }
1188
1189 /*
50f8ae69 1190 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1191 * increment i so nb_fd is the number of real FD.
1192 */
acdb9057 1193 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1194 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1195
1196 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1197 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1198 return i;
1199}
1200
1201/*
84382d49
MD
1202 * Poll on the should_quit pipe and the command socket return -1 on
1203 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1204 */
1205int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1206{
1207 int num_rdy;
1208
88f2b785 1209restart:
3bd1e081
MD
1210 num_rdy = poll(consumer_sockpoll, 2, -1);
1211 if (num_rdy == -1) {
88f2b785
MD
1212 /*
1213 * Restart interrupted system call.
1214 */
1215 if (errno == EINTR) {
1216 goto restart;
1217 }
7a57cf92 1218 PERROR("Poll error");
84382d49 1219 return -1;
3bd1e081 1220 }
509bb1cf 1221 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1222 DBG("consumer_should_quit wake up");
84382d49 1223 return 1;
3bd1e081
MD
1224 }
1225 return 0;
3bd1e081
MD
1226}
1227
1228/*
1229 * Set the error socket.
1230 */
28ab034a 1231void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
3bd1e081
MD
1232{
1233 ctx->consumer_error_socket = sock;
1234}
1235
1236/*
1237 * Set the command socket path.
1238 */
28ab034a 1239void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
3bd1e081
MD
1240{
1241 ctx->consumer_command_sock_path = sock;
1242}
1243
1244/*
1245 * Send return code to the session daemon.
1246 * If the socket is not defined, we return 0, it is not a fatal error
1247 */
fbd566c2
JG
1248int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
1249 enum lttcomm_return_code error_code)
3bd1e081
MD
1250{
1251 if (ctx->consumer_error_socket > 0) {
fbd566c2
JG
1252 const std::int32_t comm_code = std::int32_t(error_code);
1253
1254 static_assert(
1255 sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
1256 "Fixed-size communication type too small to accomodate lttcomm_return_code");
28ab034a 1257 return lttcomm_send_unix_sock(
fbd566c2 1258 ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
3bd1e081
MD
1259 }
1260
1261 return 0;
1262}
1263
1264/*
228b5bf7
DG
1265 * Close all the tracefiles and stream fds and MUST be called when all
1266 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081 1267 */
cd9adb8b 1268void lttng_consumer_cleanup()
3bd1e081 1269{
e10aec8f 1270 unsigned int trace_chunks_left;
6065ceec 1271
c3ade133
JG
1272 for (auto *channel :
1273 lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
1274 decltype(lttng_consumer_channel::node),
1275 &lttng_consumer_channel::node>(
1276 *the_consumer_data.channel_ht->ht)) {
1277 consumer_del_channel(channel);
3bd1e081 1278 }
6065ceec 1279
fa29bfbf
SM
1280 lttng_ht_destroy(the_consumer_data.channel_ht);
1281 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1282
1283 cleanup_relayd_ht();
1284
fa29bfbf 1285 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
d8ef542d 1286
228b5bf7
DG
1287 /*
1288 * This HT contains streams that are freed by either the metadata thread or
1289 * the data thread so we do *nothing* on the hash table and simply destroy
1290 * it.
1291 */
fa29bfbf 1292 lttng_ht_destroy(the_consumer_data.stream_list_ht);
28cc88f3 1293
e10aec8f
MD
1294 /*
1295 * Trace chunks in the registry may still exist if the session
1296 * daemon has encountered an internal error and could not
1297 * tear down its sessions and/or trace chunks properly.
1298 *
1299 * Release the session daemon's implicit reference to any remaining
1300 * trace chunk and print an error if any trace chunk was found. Note
1301 * that there are _no_ legitimate cases for trace chunks to be left,
1302 * it is a leak. However, it can happen following a crash of the
1303 * session daemon and not emptying the registry would cause an assertion
1304 * to hit.
1305 */
28ab034a
JG
1306 trace_chunks_left =
1307 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
e10aec8f
MD
1308 if (trace_chunks_left) {
1309 ERR("%u trace chunks are leaked by lttng-consumerd. "
28ab034a
JG
1310 "This can be caused by an internal error of the session daemon.",
1311 trace_chunks_left);
e10aec8f
MD
1312 }
1313 /* Run all callbacks freeing each chunk. */
1314 rcu_barrier();
fa29bfbf 1315 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
3bd1e081
MD
1316}
1317
1318/*
1319 * Called from signal handler.
1320 */
1321void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1322{
6cd525e8
MD
1323 ssize_t ret;
1324
10211f5c 1325 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1326 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1327 if (ret < 1) {
7a57cf92 1328 PERROR("write consumer quit");
3bd1e081 1329 }
ab1027f4
DG
1330
1331 DBG("Consumer flag that it should quit");
3bd1e081
MD
1332}
1333
5199ffc4
JG
1334/*
1335 * Flush pending writes to trace output disk file.
1336 */
28ab034a 1337static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
3bd1e081 1338{
07c4863f 1339 const int outfd = stream->out_fd;
3bd1e081
MD
1340
1341 /*
1342 * This does a blocking write-and-wait on any page that belongs to the
1343 * subbuffer prior to the one we just wrote.
1344 * Don't care about error values, as these are just hints and ways to
1345 * limit the amount of page cache used.
1346 */
ffe60014 1347 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1348 return;
1349 }
671e39d7
MJ
1350 lttng::io::hint_flush_range_dont_need_sync(
1351 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
3bd1e081
MD
1352}
1353
1354/*
1355 * Initialise the necessary environnement :
1356 * - create a new context
1357 * - create the poll_pipe
1358 * - create the should_quit pipe (for signal handler)
1359 * - create the thread pipe (for splice)
1360 *
1361 * Takes a function pointer as argument, this function is called when data is
1362 * available on a buffer. This function is responsible to do the
1363 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1364 * buffer configuration and then kernctl_put_next_subbuf at the end.
1365 *
1366 * Returns a pointer to the new context or NULL on error.
1367 */
28ab034a
JG
1368struct lttng_consumer_local_data *
1369lttng_consumer_create(enum lttng_consumer_type type,
1370 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1371 struct lttng_consumer_local_data *ctx,
1372 bool locked_by_caller),
1373 int (*recv_channel)(struct lttng_consumer_channel *channel),
1374 int (*recv_stream)(struct lttng_consumer_stream *stream),
1375 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1376{
d8ef542d 1377 int ret;
3bd1e081
MD
1378 struct lttng_consumer_local_data *ctx;
1379
a0377dfe 1380 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
28ab034a 1381 the_consumer_data.type == type);
fa29bfbf 1382 the_consumer_data.type = type;
3bd1e081 1383
64803277 1384 ctx = zmalloc<lttng_consumer_local_data>();
cd9adb8b 1385 if (ctx == nullptr) {
7a57cf92 1386 PERROR("allocating context");
3bd1e081
MD
1387 goto error;
1388 }
1389
1390 ctx->consumer_error_socket = -1;
331744e3 1391 ctx->consumer_metadata_socket = -1;
cd9adb8b 1392 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
3bd1e081
MD
1393 /* assign the callbacks */
1394 ctx->on_buffer_ready = buffer_ready;
1395 ctx->on_recv_channel = recv_channel;
1396 ctx->on_recv_stream = recv_stream;
1397 ctx->on_update_stream = update_stream;
1398
acdb9057
DG
1399 ctx->consumer_data_pipe = lttng_pipe_open(0);
1400 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1401 goto error_poll_pipe;
1402 }
1403
02b3d176
DG
1404 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1405 if (!ctx->consumer_wakeup_pipe) {
1406 goto error_wakeup_pipe;
1407 }
1408
3bd1e081
MD
1409 ret = pipe(ctx->consumer_should_quit);
1410 if (ret < 0) {
7a57cf92 1411 PERROR("Error creating recv pipe");
3bd1e081
MD
1412 goto error_quit_pipe;
1413 }
1414
d8ef542d
MD
1415 ret = pipe(ctx->consumer_channel_pipe);
1416 if (ret < 0) {
1417 PERROR("Error creating channel pipe");
1418 goto error_channel_pipe;
1419 }
1420
13886d2d
DG
1421 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1422 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1423 goto error_metadata_pipe;
1424 }
3bd1e081 1425
e9404c27
JG
1426 ctx->channel_monitor_pipe = -1;
1427
fb3a43a9 1428 return ctx;
3bd1e081 1429
fb3a43a9 1430error_metadata_pipe:
d8ef542d
MD
1431 utils_close_pipe(ctx->consumer_channel_pipe);
1432error_channel_pipe:
d8ef542d 1433 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1434error_quit_pipe:
02b3d176
DG
1435 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1436error_wakeup_pipe:
acdb9057 1437 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1438error_poll_pipe:
1439 free(ctx);
1440error:
cd9adb8b 1441 return nullptr;
3bd1e081
MD
1442}
1443
282dadbc
MD
1444/*
1445 * Iterate over all streams of the hashtable and free them properly.
1446 */
1447static void destroy_data_stream_ht(struct lttng_ht *ht)
1448{
cd9adb8b 1449 if (ht == nullptr) {
282dadbc
MD
1450 return;
1451 }
1452
c3ade133
JG
1453 for (auto *stream :
1454 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
1455 decltype(lttng_consumer_stream::node),
1456 &lttng_consumer_stream::node>(*ht->ht)) {
1457 /*
1458 * Ignore return value since we are currently cleaning up so any error
1459 * can't be handled.
1460 */
1461 (void) consumer_del_stream(stream, ht);
282dadbc 1462 }
282dadbc
MD
1463
1464 lttng_ht_destroy(ht);
1465}
1466
1467/*
1468 * Iterate over all streams of the metadata hashtable and free them
1469 * properly.
1470 */
1471static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1472{
cd9adb8b 1473 if (ht == nullptr) {
282dadbc
MD
1474 return;
1475 }
1476
c3ade133
JG
1477 for (auto *stream :
1478 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
1479 decltype(lttng_consumer_stream::node),
1480 &lttng_consumer_stream::node>(*ht->ht)) {
1481 /*
1482 * Ignore return value since we are currently cleaning up so any error
1483 * can't be handled.
1484 */
1485 (void) consumer_del_metadata_stream(stream, ht);
282dadbc 1486 }
282dadbc
MD
1487
1488 lttng_ht_destroy(ht);
1489}
1490
3bd1e081
MD
1491/*
1492 * Close all fds associated with the instance and free the context.
1493 */
1494void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1495{
4c462e79
MD
1496 int ret;
1497
ab1027f4
DG
1498 DBG("Consumer destroying it. Closing everything.");
1499
4f2e75b9
DG
1500 if (!ctx) {
1501 return;
1502 }
1503
282dadbc
MD
1504 destroy_data_stream_ht(data_ht);
1505 destroy_metadata_stream_ht(metadata_ht);
1506
4c462e79
MD
1507 ret = close(ctx->consumer_error_socket);
1508 if (ret) {
1509 PERROR("close");
1510 }
331744e3
JD
1511 ret = close(ctx->consumer_metadata_socket);
1512 if (ret) {
1513 PERROR("close");
1514 }
d8ef542d 1515 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1516 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1517 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1518 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1519 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1520
3bd1e081
MD
1521 unlink(ctx->consumer_command_sock_path);
1522 free(ctx);
1523}
1524
6197aea7
DG
1525/*
1526 * Write the metadata stream id on the specified file descriptor.
1527 */
28ab034a
JG
1528static int
1529write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
6197aea7 1530{
6cd525e8 1531 ssize_t ret;
1d4dfdef 1532 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1533
1d4dfdef
DG
1534 hdr.stream_id = htobe64(stream->relayd_stream_id);
1535 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1536 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1537 if (ret < sizeof(hdr)) {
d7b75ec8 1538 /*
6f04ed72 1539 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1540 * not to clubber the error output since this can happen in a normal
1541 * code path.
1542 */
1543 if (errno != EPIPE) {
1544 PERROR("write metadata stream id");
1545 }
1546 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1547 /*
1548 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1549 * handle writting the missing part so report that as an error and
1550 * don't lie to the caller.
1551 */
1552 ret = -1;
6197aea7
DG
1553 goto end;
1554 }
1d4dfdef 1555 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
28ab034a
JG
1556 stream->relayd_stream_id,
1557 padding);
6197aea7
DG
1558
1559end:
6cd525e8 1560 return (int) ret;
6197aea7
DG
1561}
1562
3bd1e081 1563/*
09e26845
DG
1564 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1565 * core function for writing trace buffers to either the local filesystem or
1566 * the network.
1567 *
d2956687 1568 * It must be called with the stream and the channel lock held.
79d4ffb7 1569 *
09e26845 1570 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1571 *
1572 * Returns the number of bytes written
1573 */
28ab034a
JG
1574ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1575 const struct lttng_buffer_view *buffer,
1576 unsigned long padding)
3bd1e081 1577{
994ab360 1578 ssize_t ret = 0;
f02e1e8a
DG
1579 off_t orig_offset = stream->out_fd_offset;
1580 /* Default is on the disk */
1581 int outfd = stream->out_fd;
cd9adb8b 1582 struct consumer_relayd_sock_pair *relayd = nullptr;
8994307f 1583 unsigned int relayd_hang_up = 0;
fd424d99
JG
1584 const size_t subbuf_content_size = buffer->size - padding;
1585 size_t write_len;
f02e1e8a
DG
1586
1587 /* RCU lock for the relayd pointer */
07c4863f 1588 const lttng::urcu::read_lock_guard read_lock;
28ab034a 1589 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
d2956687 1590
f02e1e8a 1591 /* Flag that the current stream if set for network streaming. */
da009f2c 1592 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1593 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1594 if (relayd == nullptr) {
56591bac 1595 ret = -EPIPE;
f02e1e8a
DG
1596 goto end;
1597 }
1598 }
1599
f02e1e8a
DG
1600 /* Handle stream on the relayd if the output is on the network */
1601 if (relayd) {
fd424d99 1602 unsigned long netlen = subbuf_content_size;
f02e1e8a
DG
1603
1604 /*
1605 * Lock the control socket for the complete duration of the function
1606 * since from this point on we will use the socket.
1607 */
1608 if (stream->metadata_flag) {
1609 /* Metadata requires the control socket. */
1610 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1611 if (stream->reset_metadata_flag) {
1612 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1613 stream->relayd_stream_id,
1614 stream->metadata_version);
93ec662e
JD
1615 if (ret < 0) {
1616 relayd_hang_up = 1;
1617 goto write_error;
1618 }
1619 stream->reset_metadata_flag = 0;
1620 }
1d4dfdef 1621 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1622 }
1623
1d4dfdef 1624 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1625 if (ret < 0) {
1626 relayd_hang_up = 1;
1627 goto write_error;
1628 }
1629 /* Use the returned socket. */
1630 outfd = ret;
f02e1e8a 1631
994ab360
DG
1632 /* Write metadata stream id before payload */
1633 if (stream->metadata_flag) {
239f61af 1634 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1635 if (ret < 0) {
8994307f
DG
1636 relayd_hang_up = 1;
1637 goto write_error;
1638 }
f02e1e8a 1639 }
1624d5b7 1640
fd424d99
JG
1641 write_len = subbuf_content_size;
1642 } else {
1643 /* No streaming; we have to write the full padding. */
93ec662e
JD
1644 if (stream->metadata_flag && stream->reset_metadata_flag) {
1645 ret = utils_truncate_stream_file(stream->out_fd, 0);
1646 if (ret < 0) {
1647 ERR("Reset metadata file");
1648 goto end;
1649 }
1650 stream->reset_metadata_flag = 0;
1651 }
1652
1624d5b7
JD
1653 /*
1654 * Check if we need to change the tracefile before writing the packet.
1655 */
1656 if (stream->chan->tracefile_size > 0 &&
28ab034a
JG
1657 (stream->tracefile_size_current + buffer->size) >
1658 stream->chan->tracefile_size) {
d2956687
JG
1659 ret = consumer_stream_rotate_output_files(stream);
1660 if (ret) {
1624d5b7
JD
1661 goto end;
1662 }
309167d2 1663 outfd = stream->out_fd;
a1ae300f 1664 orig_offset = 0;
1624d5b7 1665 }
fd424d99 1666 stream->tracefile_size_current += buffer->size;
fd424d99 1667 write_len = buffer->size;
f02e1e8a
DG
1668 }
1669
d02b8372
DG
1670 /*
1671 * This call guarantee that len or less is returned. It's impossible to
1672 * receive a ret value that is bigger than len.
1673 */
fd424d99 1674 ret = lttng_write(outfd, buffer->data, write_len);
e2d1190b 1675 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
fd424d99 1676 if (ret < 0 || ((size_t) ret != write_len)) {
d02b8372
DG
1677 /*
1678 * Report error to caller if nothing was written else at least send the
1679 * amount written.
1680 */
1681 if (ret < 0) {
994ab360 1682 ret = -errno;
f02e1e8a 1683 }
994ab360 1684 relayd_hang_up = 1;
f02e1e8a 1685
d02b8372 1686 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1687 if (errno == EPIPE) {
d02b8372
DG
1688 /*
1689 * This is possible if the fd is closed on the other side
1690 * (outfd) or any write problem. It can be verbose a bit for a
1691 * normal execution if for instance the relayd is stopped
1692 * abruptly. This can happen so set this to a DBG statement.
1693 */
1694 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1695 } else {
1696 /* Unhandled error, print it and stop function right now. */
28ab034a 1697 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
f02e1e8a 1698 }
994ab360 1699 goto write_error;
d02b8372
DG
1700 }
1701 stream->output_written += ret;
d02b8372
DG
1702
1703 /* This call is useless on a socket so better save a syscall. */
1704 if (!relayd) {
1705 /* This won't block, but will start writeout asynchronously */
671e39d7 1706 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
fd424d99 1707 stream->out_fd_offset += write_len;
f5dbe415 1708 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1709 }
f02e1e8a 1710
8994307f
DG
1711write_error:
1712 /*
1713 * This is a special case that the relayd has closed its socket. Let's
1714 * cleanup the relayd object and all associated streams.
1715 */
1716 if (relayd && relayd_hang_up) {
28ab034a 1717 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1718 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1719 }
1720
f02e1e8a
DG
1721end:
1722 /* Unlock only if ctrl socket used */
1723 if (relayd && stream->metadata_flag) {
1724 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1725 }
1726
994ab360 1727 return ret;
3bd1e081
MD
1728}
1729
1730/*
1731 * Splice the data from the ring buffer to the tracefile.
1732 *
79d4ffb7
DG
1733 * It must be called with the stream lock held.
1734 *
3bd1e081
MD
1735 * Returns the number of bytes spliced.
1736 */
28ab034a
JG
1737ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1738 struct lttng_consumer_stream *stream,
1739 unsigned long len,
1740 unsigned long padding)
3bd1e081 1741{
f02e1e8a
DG
1742 ssize_t ret = 0, written = 0, ret_splice = 0;
1743 loff_t offset = 0;
1744 off_t orig_offset = stream->out_fd_offset;
07c4863f 1745 const int fd = stream->wait_fd;
f02e1e8a
DG
1746 /* Default is on the disk */
1747 int outfd = stream->out_fd;
cd9adb8b 1748 struct consumer_relayd_sock_pair *relayd = nullptr;
fb3a43a9 1749 int *splice_pipe;
8994307f 1750 unsigned int relayd_hang_up = 0;
f02e1e8a 1751
fa29bfbf 1752 switch (the_consumer_data.type) {
3bd1e081 1753 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1754 break;
7753dea8
MD
1755 case LTTNG_CONSUMER32_UST:
1756 case LTTNG_CONSUMER64_UST:
f02e1e8a 1757 /* Not supported for user space tracing */
3bd1e081
MD
1758 return -ENOSYS;
1759 default:
1760 ERR("Unknown consumer_data type");
a0377dfe 1761 abort();
3bd1e081
MD
1762 }
1763
f02e1e8a 1764 /* RCU lock for the relayd pointer */
07c4863f 1765 const lttng::urcu::read_lock_guard read_lock;
f02e1e8a
DG
1766
1767 /* Flag that the current stream if set for network streaming. */
da009f2c 1768 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1769 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1770 if (relayd == nullptr) {
ad0b0d23 1771 written = -ret;
f02e1e8a
DG
1772 goto end;
1773 }
1774 }
a2361a61 1775 splice_pipe = stream->splice_pipe;
fb3a43a9 1776
f02e1e8a 1777 /* Write metadata stream id before payload */
1d4dfdef 1778 if (relayd) {
ad0b0d23 1779 unsigned long total_len = len;
f02e1e8a 1780
1d4dfdef
DG
1781 if (stream->metadata_flag) {
1782 /*
1783 * Lock the control socket for the complete duration of the function
1784 * since from this point on we will use the socket.
1785 */
1786 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1787
93ec662e
JD
1788 if (stream->reset_metadata_flag) {
1789 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1790 stream->relayd_stream_id,
1791 stream->metadata_version);
93ec662e
JD
1792 if (ret < 0) {
1793 relayd_hang_up = 1;
1794 goto write_error;
1795 }
1796 stream->reset_metadata_flag = 0;
1797 }
28ab034a 1798 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1d4dfdef
DG
1799 if (ret < 0) {
1800 written = ret;
ad0b0d23
DG
1801 relayd_hang_up = 1;
1802 goto write_error;
1d4dfdef
DG
1803 }
1804
1805 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1806 }
1807
1808 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1809 if (ret < 0) {
1810 written = ret;
1811 relayd_hang_up = 1;
1812 goto write_error;
f02e1e8a 1813 }
ad0b0d23
DG
1814 /* Use the returned socket. */
1815 outfd = ret;
1d4dfdef
DG
1816 } else {
1817 /* No streaming, we have to set the len with the full padding */
1818 len += padding;
1624d5b7 1819
93ec662e
JD
1820 if (stream->metadata_flag && stream->reset_metadata_flag) {
1821 ret = utils_truncate_stream_file(stream->out_fd, 0);
1822 if (ret < 0) {
1823 ERR("Reset metadata file");
1824 goto end;
1825 }
1826 stream->reset_metadata_flag = 0;
1827 }
1624d5b7
JD
1828 /*
1829 * Check if we need to change the tracefile before writing the packet.
1830 */
1831 if (stream->chan->tracefile_size > 0 &&
28ab034a 1832 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
d2956687 1833 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1834 if (ret < 0) {
ad0b0d23 1835 written = ret;
1624d5b7
JD
1836 goto end;
1837 }
309167d2 1838 outfd = stream->out_fd;
a1ae300f 1839 orig_offset = 0;
1624d5b7
JD
1840 }
1841 stream->tracefile_size_current += len;
f02e1e8a
DG
1842 }
1843
1844 while (len > 0) {
1d4dfdef 1845 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
28ab034a
JG
1846 (unsigned long) offset,
1847 len,
1848 fd,
1849 splice_pipe[1]);
1850 ret_splice = splice(
cd9adb8b 1851 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
f02e1e8a
DG
1852 DBG("splice chan to pipe, ret %zd", ret_splice);
1853 if (ret_splice < 0) {
d02b8372 1854 ret = errno;
ad0b0d23 1855 written = -ret;
d02b8372 1856 PERROR("Error in relay splice");
f02e1e8a
DG
1857 goto splice_error;
1858 }
1859
1860 /* Handle stream on the relayd if the output is on the network */
ad0b0d23 1861 if (relayd && stream->metadata_flag) {
07c4863f 1862 const size_t metadata_payload_size =
ad0b0d23
DG
1863 sizeof(struct lttcomm_relayd_metadata_payload);
1864
1865 /* Update counter to fit the spliced data */
1866 ret_splice += metadata_payload_size;
1867 len += metadata_payload_size;
1868 /*
1869 * We do this so the return value can match the len passed as
1870 * argument to this function.
1871 */
1872 written -= metadata_payload_size;
f02e1e8a
DG
1873 }
1874
1875 /* Splice data out */
28ab034a 1876 ret_splice = splice(splice_pipe[0],
cd9adb8b 1877 nullptr,
28ab034a 1878 outfd,
cd9adb8b 1879 nullptr,
28ab034a
JG
1880 ret_splice,
1881 SPLICE_F_MOVE | SPLICE_F_MORE);
1882 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
f02e1e8a 1883 if (ret_splice < 0) {
d02b8372 1884 ret = errno;
ad0b0d23
DG
1885 written = -ret;
1886 relayd_hang_up = 1;
1887 goto write_error;
f02e1e8a 1888 } else if (ret_splice > len) {
d02b8372
DG
1889 /*
1890 * We don't expect this code path to be executed but you never know
1891 * so this is an extra protection agains a buggy splice().
1892 */
f02e1e8a 1893 ret = errno;
ad0b0d23 1894 written += ret_splice;
28ab034a 1895 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
f02e1e8a 1896 goto splice_error;
d02b8372
DG
1897 } else {
1898 /* All good, update current len and continue. */
1899 len -= ret_splice;
f02e1e8a 1900 }
f02e1e8a
DG
1901
1902 /* This call is useless on a socket so better save a syscall. */
1903 if (!relayd) {
1904 /* This won't block, but will start writeout asynchronously */
671e39d7 1905 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
f02e1e8a
DG
1906 stream->out_fd_offset += ret_splice;
1907 }
e5d1a9b3 1908 stream->output_written += ret_splice;
f02e1e8a
DG
1909 written += ret_splice;
1910 }
f5dbe415
JG
1911 if (!relayd) {
1912 lttng_consumer_sync_trace_file(stream, orig_offset);
1913 }
f02e1e8a
DG
1914 goto end;
1915
8994307f
DG
1916write_error:
1917 /*
1918 * This is a special case that the relayd has closed its socket. Let's
1919 * cleanup the relayd object and all associated streams.
1920 */
1921 if (relayd && relayd_hang_up) {
28ab034a 1922 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1923 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1924 /* Skip splice error so the consumer does not fail */
1925 goto end;
1926 }
1927
f02e1e8a
DG
1928splice_error:
1929 /* send the appropriate error description to sessiond */
1930 switch (ret) {
f02e1e8a 1931 case EINVAL:
f73fabfd 1932 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1933 break;
1934 case ENOMEM:
f73fabfd 1935 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1936 break;
1937 case ESPIPE:
f73fabfd 1938 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1939 break;
1940 }
1941
1942end:
1943 if (relayd && stream->metadata_flag) {
1944 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1945 }
1946
f02e1e8a 1947 return written;
3bd1e081
MD
1948}
1949
15055ce5
JD
1950/*
1951 * Sample the snapshot positions for a specific fd
1952 *
1953 * Returns 0 on success, < 0 on error
1954 */
1955int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1956{
fa29bfbf 1957 switch (the_consumer_data.type) {
15055ce5
JD
1958 case LTTNG_CONSUMER_KERNEL:
1959 return lttng_kconsumer_sample_snapshot_positions(stream);
1960 case LTTNG_CONSUMER32_UST:
1961 case LTTNG_CONSUMER64_UST:
1962 return lttng_ustconsumer_sample_snapshot_positions(stream);
1963 default:
1964 ERR("Unknown consumer_data type");
a0377dfe 1965 abort();
15055ce5
JD
1966 return -ENOSYS;
1967 }
1968}
3bd1e081
MD
1969/*
1970 * Take a snapshot for a specific fd
1971 *
1972 * Returns 0 on success, < 0 on error
1973 */
ffe60014 1974int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1975{
fa29bfbf 1976 switch (the_consumer_data.type) {
3bd1e081 1977 case LTTNG_CONSUMER_KERNEL:
ffe60014 1978 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1979 case LTTNG_CONSUMER32_UST:
1980 case LTTNG_CONSUMER64_UST:
ffe60014 1981 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1982 default:
1983 ERR("Unknown consumer_data type");
a0377dfe 1984 abort();
3bd1e081
MD
1985 return -ENOSYS;
1986 }
3bd1e081
MD
1987}
1988
1989/*
1990 * Get the produced position
1991 *
1992 * Returns 0 on success, < 0 on error
1993 */
28ab034a 1994int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 1995{
fa29bfbf 1996 switch (the_consumer_data.type) {
3bd1e081 1997 case LTTNG_CONSUMER_KERNEL:
ffe60014 1998 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
1999 case LTTNG_CONSUMER32_UST:
2000 case LTTNG_CONSUMER64_UST:
ffe60014 2001 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2002 default:
2003 ERR("Unknown consumer_data type");
a0377dfe 2004 abort();
3bd1e081
MD
2005 return -ENOSYS;
2006 }
2007}
2008
15055ce5
JD
2009/*
2010 * Get the consumed position (free-running counter position in bytes).
2011 *
2012 * Returns 0 on success, < 0 on error
2013 */
28ab034a 2014int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
15055ce5 2015{
fa29bfbf 2016 switch (the_consumer_data.type) {
15055ce5
JD
2017 case LTTNG_CONSUMER_KERNEL:
2018 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2019 case LTTNG_CONSUMER32_UST:
2020 case LTTNG_CONSUMER64_UST:
2021 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2022 default:
2023 ERR("Unknown consumer_data type");
a0377dfe 2024 abort();
15055ce5
JD
2025 return -ENOSYS;
2026 }
2027}
2028
3bd1e081 2029int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
2030 int sock,
2031 struct pollfd *consumer_sockpoll)
3bd1e081 2032{
fa29bfbf 2033 switch (the_consumer_data.type) {
3bd1e081
MD
2034 case LTTNG_CONSUMER_KERNEL:
2035 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2036 case LTTNG_CONSUMER32_UST:
2037 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2038 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2039 default:
2040 ERR("Unknown consumer_data type");
a0377dfe 2041 abort();
3bd1e081
MD
2042 return -ENOSYS;
2043 }
2044}
2045
cd9adb8b 2046static void lttng_consumer_close_all_metadata()
d88aee68 2047{
fa29bfbf 2048 switch (the_consumer_data.type) {
d88aee68
DG
2049 case LTTNG_CONSUMER_KERNEL:
2050 /*
2051 * The Kernel consumer has a different metadata scheme so we don't
2052 * close anything because the stream will be closed by the session
2053 * daemon.
2054 */
2055 break;
2056 case LTTNG_CONSUMER32_UST:
2057 case LTTNG_CONSUMER64_UST:
2058 /*
2059 * Close all metadata streams. The metadata hash table is passed and
2060 * this call iterates over it by closing all wakeup fd. This is safe
2061 * because at this point we are sure that the metadata producer is
2062 * either dead or blocked.
2063 */
6d574024 2064 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2065 break;
2066 default:
2067 ERR("Unknown consumer_data type");
a0377dfe 2068 abort();
d88aee68
DG
2069 }
2070}
2071
fb3a43a9
DG
2072/*
2073 * Clean up a metadata stream and free its memory.
2074 */
28ab034a 2075void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
fb3a43a9 2076{
cd9adb8b 2077 struct lttng_consumer_channel *channel = nullptr;
a6ef8ee6 2078 bool free_channel = false;
fb3a43a9 2079
a0377dfe 2080 LTTNG_ASSERT(stream);
fb3a43a9
DG
2081 /*
2082 * This call should NEVER receive regular stream. It must always be
2083 * metadata stream and this is crucial for data structure synchronization.
2084 */
a0377dfe 2085 LTTNG_ASSERT(stream->metadata_flag);
fb3a43a9 2086
e316aad5
DG
2087 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2088
fa29bfbf 2089 pthread_mutex_lock(&the_consumer_data.lock);
a6ef8ee6
JG
2090 /*
2091 * Note that this assumes that a stream's channel is never changed and
2092 * that the stream's lock doesn't need to be taken to sample its
2093 * channel.
2094 */
2095 channel = stream->chan;
2096 pthread_mutex_lock(&channel->lock);
3dad2c0f 2097 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2098 if (channel->metadata_cache) {
081424af 2099 /* Only applicable to userspace consumers. */
a6ef8ee6 2100 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2101 }
8994307f 2102
6d574024
DG
2103 /* Remove any reference to that stream. */
2104 consumer_stream_delete(stream, ht);
ca22feea 2105
6d574024 2106 /* Close down everything including the relayd if one. */
d119bd01 2107 consumer_stream_close_output(stream);
6d574024
DG
2108 /* Destroy tracer buffers of the stream. */
2109 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2110
2111 /* Atomically decrement channel refcount since other threads can use it. */
28ab034a
JG
2112 if (!uatomic_sub_return(&channel->refcount, 1) &&
2113 !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2114 /* Go for channel deletion! */
a6ef8ee6 2115 free_channel = true;
fb3a43a9
DG
2116 }
2117
73811ecc
DG
2118 /*
2119 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2120 * channel lock MUST be acquired before being able to check for a NULL
2121 * pointer value.
73811ecc 2122 */
cd9adb8b 2123 channel->metadata_stream = nullptr;
32670d71 2124 channel->metadata_pushed_wait_queue.wake_all();
73811ecc 2125
a6ef8ee6
JG
2126 if (channel->metadata_cache) {
2127 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2128 }
3dad2c0f 2129 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2130 pthread_mutex_unlock(&channel->lock);
fa29bfbf 2131 pthread_mutex_unlock(&the_consumer_data.lock);
e316aad5 2132
a6ef8ee6
JG
2133 if (free_channel) {
2134 consumer_del_channel(channel);
e316aad5
DG
2135 }
2136
d2956687 2137 lttng_trace_chunk_put(stream->trace_chunk);
cd9adb8b 2138 stream->trace_chunk = nullptr;
6d574024 2139 consumer_stream_free(stream);
fb3a43a9
DG
2140}
2141
2142/*
2143 * Action done with the metadata stream when adding it to the consumer internal
2144 * data structures to handle it.
2145 */
66d583dc 2146void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2147{
5ab66908 2148 struct lttng_ht *ht = metadata_ht;
76082088 2149 struct lttng_ht_iter iter;
d88aee68 2150 struct lttng_ht_node_u64 *node;
fb3a43a9 2151
a0377dfe
FD
2152 LTTNG_ASSERT(stream);
2153 LTTNG_ASSERT(ht);
e316aad5 2154
d88aee68 2155 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5 2156
fa29bfbf 2157 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 2158 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2159 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2160 pthread_mutex_lock(&stream->lock);
e316aad5 2161
e316aad5
DG
2162 /*
2163 * From here, refcounts are updated so be _careful_ when returning an error
2164 * after this point.
2165 */
2166
07c4863f 2167 const lttng::urcu::read_lock_guard read_lock;
76082088
DG
2168
2169 /*
2170 * Lookup the stream just to make sure it does not exist in our internal
2171 * state. This should NEVER happen.
2172 */
d88aee68 2173 lttng_ht_lookup(ht, &stream->key, &iter);
00d7d903 2174 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
a0377dfe 2175 LTTNG_ASSERT(!node);
76082088 2176
e316aad5 2177 /*
ffe60014
DG
2178 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2179 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2180 * causes the count to become 0 also causes a stream to be added. The
2181 * channel deletion will thus be triggered by the following removal of this
2182 * stream.
2183 */
ffe60014 2184 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2185 /* Increment refcount before decrementing nb_init_stream_left */
2186 cmm_smp_wmb();
ffe60014 2187 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2188 }
2189
d88aee68 2190 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2191
28ab034a 2192 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 2193
ca22feea
DG
2194 /*
2195 * Add stream to the stream_list_ht of the consumer data. No need to steal
2196 * the key since the HT does not use it and we allow to add redundant keys
2197 * into this table.
2198 */
28ab034a 2199 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2200
2e818a6a 2201 pthread_mutex_unlock(&stream->lock);
a9838785 2202 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2203 pthread_mutex_unlock(&stream->chan->timer_lock);
fa29bfbf 2204 pthread_mutex_unlock(&the_consumer_data.lock);
fb3a43a9
DG
2205}
2206
8994307f
DG
2207/*
2208 * Delete data stream that are flagged for deletion (endpoint_status).
2209 */
cd9adb8b 2210static void validate_endpoint_status_data_stream()
8994307f 2211{
8994307f
DG
2212 DBG("Consumer delete flagged data stream");
2213
c3ade133
JG
2214 for (auto *stream :
2215 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
2216 decltype(lttng_consumer_stream::node),
2217 &lttng_consumer_stream::node>(*data_ht->ht)) {
2218 /* Validate delete flag of the stream */
2219 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2220 continue;
8994307f 2221 }
c3ade133
JG
2222 /* Delete it right now */
2223 consumer_del_stream(stream, data_ht);
8994307f 2224 }
8994307f
DG
2225}
2226
2227/*
2228 * Delete metadata stream that are flagged for deletion (endpoint_status).
2229 */
28ab034a 2230static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
8994307f 2231{
8994307f
DG
2232 DBG("Consumer delete flagged metadata stream");
2233
a0377dfe 2234 LTTNG_ASSERT(pollset);
8994307f 2235
c3ade133
JG
2236 for (auto *stream :
2237 lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
2238 decltype(lttng_consumer_stream::node),
2239 &lttng_consumer_stream::node>(*metadata_ht->ht)) {
2240 /* Validate delete flag of the stream */
2241 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2242 continue;
56047f5a 2243 }
c3ade133
JG
2244 /*
2245 * Remove from pollset so the metadata thread can continue without
2246 * blocking on a deleted stream.
2247 */
2248 lttng_poll_del(pollset, stream->wait_fd);
2249
2250 /* Delete it right now */
2251 consumer_del_metadata_stream(stream, metadata_ht);
8994307f 2252 }
8994307f
DG
2253}
2254
fb3a43a9
DG
2255/*
2256 * Thread polls on metadata file descriptor and write them on disk or on the
2257 * network.
2258 */
7d980def 2259void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2260{
1fc79fb4 2261 int ret, i, pollfd, err = -1;
fb3a43a9 2262 uint32_t revents, nb_fd;
cd9adb8b 2263 struct lttng_consumer_stream *stream = nullptr;
fb3a43a9 2264 struct lttng_ht_iter iter;
d88aee68 2265 struct lttng_ht_node_u64 *node;
fb3a43a9 2266 struct lttng_poll_event events;
97535efa 2267 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
fb3a43a9
DG
2268 ssize_t len;
2269
2270 rcu_register_thread();
2271
1fc79fb4
MD
2272 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2273
2d57de81
MD
2274 if (testpoint(consumerd_thread_metadata)) {
2275 goto error_testpoint;
2276 }
2277
9ce5646a
MD
2278 health_code_update();
2279
fb3a43a9
DG
2280 DBG("Thread metadata poll started");
2281
fb3a43a9
DG
2282 /* Size is set to 1 for the consumer_metadata pipe */
2283 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2284 if (ret < 0) {
2285 ERR("Poll set creation failed");
d8ef542d 2286 goto end_poll;
fb3a43a9
DG
2287 }
2288
28ab034a 2289 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2290 if (ret < 0) {
2291 goto end;
2292 }
2293
2294 /* Main loop */
2295 DBG("Metadata main loop started");
2296
cd9adb8b 2297 while (true) {
28ab034a 2298 restart:
7fa2082e 2299 health_code_update();
9ce5646a 2300 health_poll_entry();
7fa2082e 2301 DBG("Metadata poll wait");
fb3a43a9 2302 ret = lttng_poll_wait(&events, -1);
28ab034a 2303 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2304 health_poll_exit();
40063ead 2305 DBG("Metadata event caught in thread");
fb3a43a9
DG
2306 if (ret < 0) {
2307 if (errno == EINTR) {
40063ead 2308 ERR("Poll EINTR caught");
fb3a43a9
DG
2309 goto restart;
2310 }
d9607cd7 2311 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2312 err = 0; /* All is OK */
d9607cd7
MD
2313 }
2314 goto end;
fb3a43a9
DG
2315 }
2316
0d9c5d77
DG
2317 nb_fd = ret;
2318
e316aad5 2319 /* From here, the event is a metadata wait fd */
fb3a43a9 2320 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2321 health_code_update();
2322
fb3a43a9
DG
2323 revents = LTTNG_POLL_GETEV(&events, i);
2324 pollfd = LTTNG_POLL_GETFD(&events, i);
2325
13886d2d 2326 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2327 if (revents & LPOLLIN) {
13886d2d
DG
2328 ssize_t pipe_len;
2329
2330 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
28ab034a 2331 &stream,
5c7248cd
JG
2332 sizeof(stream)); /* NOLINT sizeof
2333 used on a
2334 pointer. */
2335 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2336 pointer. */
03e43155
MD
2337 if (pipe_len < 0) {
2338 PERROR("read metadata stream");
2339 }
fb3a43a9 2340 /*
28ab034a
JG
2341 * Remove the pipe from the poll set and continue
2342 * the loop since their might be data to consume.
fb3a43a9 2343 */
28ab034a
JG
2344 lttng_poll_del(
2345 &events,
2346 lttng_pipe_get_readfd(
2347 ctx->consumer_metadata_pipe));
03e43155 2348 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2349 continue;
2350 }
2351
8994307f 2352 /* A NULL stream means that the state has changed. */
cd9adb8b 2353 if (stream == nullptr) {
8994307f
DG
2354 /* Check for deleted streams. */
2355 validate_endpoint_status_metadata_stream(&events);
3714380f 2356 goto restart;
8994307f
DG
2357 }
2358
fb3a43a9 2359 DBG("Adding metadata stream %d to poll set",
28ab034a 2360 stream->wait_fd);
fb3a43a9 2361
fb3a43a9 2362 /* Add metadata stream to the global poll events list */
28ab034a
JG
2363 lttng_poll_add(
2364 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2365 } else if (revents & (LPOLLERR | LPOLLHUP)) {
03e43155
MD
2366 DBG("Metadata thread pipe hung up");
2367 /*
2368 * Remove the pipe from the poll set and continue the loop
2369 * since their might be data to consume.
2370 */
28ab034a
JG
2371 lttng_poll_del(
2372 &events,
2373 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
03e43155
MD
2374 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2375 continue;
2376 } else {
28ab034a
JG
2377 ERR("Unexpected poll events %u for sock %d",
2378 revents,
2379 pollfd);
03e43155 2380 goto end;
fb3a43a9
DG
2381 }
2382
e316aad5 2383 /* Handle other stream */
fb3a43a9
DG
2384 continue;
2385 }
2386
07c4863f 2387 const lttng::urcu::read_lock_guard read_lock;
d88aee68
DG
2388 {
2389 uint64_t tmp_id = (uint64_t) pollfd;
2390
2391 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2392 }
00d7d903 2393 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
a0377dfe 2394 LTTNG_ASSERT(node);
fb3a43a9 2395
ac2c50cd 2396 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
fb3a43a9 2397
03e43155
MD
2398 if (revents & (LPOLLIN | LPOLLPRI)) {
2399 /* Get the data out of the metadata file descriptor */
2400 DBG("Metadata available on fd %d", pollfd);
a0377dfe 2401 LTTNG_ASSERT(stream->wait_fd == pollfd);
03e43155
MD
2402
2403 do {
2404 health_code_update();
2405
6f9449c2 2406 len = ctx->on_buffer_ready(stream, ctx, false);
03e43155
MD
2407 /*
2408 * We don't check the return value here since if we get
83f4233d 2409 * a negative len, it means an error occurred thus we
03e43155
MD
2410 * simply remove it from the poll set and free the
2411 * stream.
2412 */
2413 } while (len > 0);
2414
2415 /* It's ok to have an unavailable sub-buffer */
2416 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2417 /* Clean up stream from consumer and free it. */
2418 lttng_poll_del(&events, stream->wait_fd);
2419 consumer_del_metadata_stream(stream, metadata_ht);
2420 }
2421 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2422 DBG("Metadata fd %d is hup|err.", pollfd);
fa29bfbf 2423 if (!stream->hangup_flush_done &&
28ab034a
JG
2424 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2425 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
fb3a43a9
DG
2426 DBG("Attempting to flush and consume the UST buffers");
2427 lttng_ustconsumer_on_stream_hangup(stream);
2428
2429 /* We just flushed the stream now read it. */
4bb94b75 2430 do {
9ce5646a
MD
2431 health_code_update();
2432
6f9449c2 2433 len = ctx->on_buffer_ready(stream, ctx, false);
4bb94b75 2434 /*
28ab034a
JG
2435 * We don't check the return value here since if we
2436 * get a negative len, it means an error occurred
2437 * thus we simply remove it from the poll set and
2438 * free the stream.
4bb94b75
DG
2439 */
2440 } while (len > 0);
fb3a43a9
DG
2441 }
2442
fb3a43a9 2443 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2444 /*
2445 * This call update the channel states, closes file descriptors
2446 * and securely free the stream.
2447 */
2448 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2449 } else {
2450 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 2451 goto end;
fb3a43a9 2452 }
e316aad5 2453 /* Release RCU lock for the stream looked up */
fb3a43a9
DG
2454 }
2455 }
2456
1fc79fb4
MD
2457 /* All is OK */
2458 err = 0;
fb3a43a9
DG
2459end:
2460 DBG("Metadata poll thread exiting");
fb3a43a9 2461
d8ef542d
MD
2462 lttng_poll_clean(&events);
2463end_poll:
2d57de81 2464error_testpoint:
1fc79fb4
MD
2465 if (err) {
2466 health_error();
2467 ERR("Health error occurred in %s", __func__);
2468 }
2469 health_unregister(health_consumerd);
fb3a43a9 2470 rcu_unregister_thread();
cd9adb8b 2471 return nullptr;
fb3a43a9
DG
2472}
2473
3bd1e081 2474/*
e4421fec 2475 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2476 * it to tracefile if necessary.
2477 */
7d980def 2478void *consumer_thread_data_poll(void *data)
3bd1e081 2479{
ff930959 2480 int num_rdy, high_prio, ret, i, err = -1;
cd9adb8b 2481 struct pollfd *pollfd = nullptr;
3bd1e081 2482 /* local view of the streams */
cd9adb8b 2483 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
3bd1e081 2484 /* local view of consumer_data.fds_count */
8bdcc002
JG
2485 int nb_fd = 0;
2486 /* 2 for the consumer_data_pipe and wake up pipe */
2487 const int nb_pipes_fd = 2;
9a2fcf78
JD
2488 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2489 int nb_inactive_fd = 0;
97535efa 2490 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
00e2e675 2491 ssize_t len;
3bd1e081 2492
e7b994a3
DG
2493 rcu_register_thread();
2494
1fc79fb4
MD
2495 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2496
2d57de81
MD
2497 if (testpoint(consumerd_thread_data)) {
2498 goto error_testpoint;
2499 }
2500
9ce5646a
MD
2501 health_code_update();
2502
64803277 2503 local_stream = zmalloc<lttng_consumer_stream *>();
cd9adb8b 2504 if (local_stream == nullptr) {
4df6c8cb
MD
2505 PERROR("local_stream malloc");
2506 goto end;
2507 }
3bd1e081 2508
cd9adb8b 2509 while (true) {
9ce5646a
MD
2510 health_code_update();
2511
3bd1e081 2512 high_prio = 0;
3bd1e081
MD
2513
2514 /*
e4421fec 2515 * the fds set has been updated, we need to update our
3bd1e081
MD
2516 * local array as well
2517 */
fa29bfbf
SM
2518 pthread_mutex_lock(&the_consumer_data.lock);
2519 if (the_consumer_data.need_update) {
0e428499 2520 free(pollfd);
cd9adb8b 2521 pollfd = nullptr;
0e428499
DG
2522
2523 free(local_stream);
cd9adb8b 2524 local_stream = nullptr;
3bd1e081 2525
8bdcc002 2526 /* Allocate for all fds */
28ab034a
JG
2527 pollfd =
2528 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2529 if (pollfd == nullptr) {
7a57cf92 2530 PERROR("pollfd malloc");
fa29bfbf 2531 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2532 goto end;
2533 }
2534
28ab034a
JG
2535 local_stream = calloc<lttng_consumer_stream *>(
2536 the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2537 if (local_stream == nullptr) {
7a57cf92 2538 PERROR("local_stream malloc");
fa29bfbf 2539 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2540 goto end;
2541 }
28ab034a
JG
2542 ret = update_poll_array(
2543 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
3bd1e081
MD
2544 if (ret < 0) {
2545 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2546 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
fa29bfbf 2547 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2548 goto end;
2549 }
2550 nb_fd = ret;
fa29bfbf 2551 the_consumer_data.need_update = 0;
3bd1e081 2552 }
fa29bfbf 2553 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081 2554
4078b776 2555 /* No FDs and consumer_quit, consumer_cleanup the thread */
28ab034a
JG
2556 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2557 err = 0; /* All is OK */
4078b776
MD
2558 goto end;
2559 }
3bd1e081 2560 /* poll on the array of fds */
88f2b785 2561 restart:
261de637 2562 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2563 if (testpoint(consumerd_thread_data_poll)) {
2564 goto end;
2565 }
9ce5646a 2566 health_poll_entry();
261de637 2567 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2568 health_poll_exit();
3bd1e081
MD
2569 DBG("poll num_rdy : %d", num_rdy);
2570 if (num_rdy == -1) {
88f2b785
MD
2571 /*
2572 * Restart interrupted system call.
2573 */
2574 if (errno == EINTR) {
2575 goto restart;
2576 }
7a57cf92 2577 PERROR("Poll error");
f73fabfd 2578 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2579 goto end;
2580 } else if (num_rdy == 0) {
2581 DBG("Polling thread timed out");
2582 goto end;
2583 }
2584
80957876
JG
2585 if (caa_unlikely(data_consumption_paused)) {
2586 DBG("Data consumption paused, sleeping...");
2587 sleep(1);
2588 goto restart;
2589 }
2590
3bd1e081 2591 /*
50f8ae69 2592 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2593 * beginning of the loop to update the array. We want to prioritize
2594 * array update over low-priority reads.
3bd1e081 2595 */
509bb1cf 2596 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2597 ssize_t pipe_readlen;
04fdd819 2598
50f8ae69 2599 DBG("consumer_data_pipe wake up");
5c7248cd
JG
2600 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2601 &new_stream,
2602 sizeof(new_stream)); /* NOLINT sizeof used on
2603 a pointer. */
2604 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2605 */
6cd525e8 2606 PERROR("Consumer data pipe");
23f5f35d
DG
2607 /* Continue so we can at least handle the current stream(s). */
2608 continue;
2609 }
c869f647
DG
2610
2611 /*
2612 * If the stream is NULL, just ignore it. It's also possible that
2613 * the sessiond poll thread changed the consumer_quit state and is
2614 * waking us up to test it.
2615 */
cd9adb8b 2616 if (new_stream == nullptr) {
8994307f 2617 validate_endpoint_status_data_stream();
c869f647
DG
2618 continue;
2619 }
2620
c869f647 2621 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2622 continue;
2623 }
2624
02b3d176
DG
2625 /* Handle wakeup pipe. */
2626 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2627 char dummy;
2628 ssize_t pipe_readlen;
2629
28ab034a
JG
2630 pipe_readlen =
2631 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
02b3d176
DG
2632 if (pipe_readlen < 0) {
2633 PERROR("Consumer data wakeup pipe");
2634 }
2635 /* We've been awakened to handle stream(s). */
2636 ctx->has_wakeup = 0;
2637 }
2638
3bd1e081
MD
2639 /* Take care of high priority channels first. */
2640 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2641 health_code_update();
2642
cd9adb8b 2643 if (local_stream[i] == nullptr) {
9617607b
DG
2644 continue;
2645 }
fb3a43a9 2646 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2647 DBG("Urgent read on fd %d", pollfd[i].fd);
2648 high_prio = 1;
6f9449c2 2649 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
d41f73b7 2650 /* it's ok to have an unavailable sub-buffer */
b64403e3 2651 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2652 /* Clean the stream and free it. */
2653 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2654 local_stream[i] = nullptr;
4078b776 2655 } else if (len > 0) {
28ab034a
JG
2656 local_stream[i]->has_data_left_to_be_read_before_teardown =
2657 1;
d41f73b7 2658 }
3bd1e081
MD
2659 }
2660 }
2661
4078b776
MD
2662 /*
2663 * If we read high prio channel in this loop, try again
2664 * for more high prio data.
2665 */
2666 if (high_prio) {
3bd1e081
MD
2667 continue;
2668 }
2669
2670 /* Take care of low priority channels. */
4078b776 2671 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2672 health_code_update();
2673
cd9adb8b 2674 if (local_stream[i] == nullptr) {
9617607b
DG
2675 continue;
2676 }
28ab034a
JG
2677 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2678 local_stream[i]->has_data) {
4078b776 2679 DBG("Normal read on fd %d", pollfd[i].fd);
6f9449c2 2680 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
4078b776 2681 /* it's ok to have an unavailable sub-buffer */
b64403e3 2682 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2683 /* Clean the stream and free it. */
2684 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2685 local_stream[i] = nullptr;
4078b776 2686 } else if (len > 0) {
28ab034a
JG
2687 local_stream[i]->has_data_left_to_be_read_before_teardown =
2688 1;
4078b776
MD
2689 }
2690 }
2691 }
2692
2693 /* Handle hangup and errors */
2694 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2695 health_code_update();
2696
cd9adb8b 2697 if (local_stream[i] == nullptr) {
9617607b
DG
2698 continue;
2699 }
28ab034a
JG
2700 if (!local_stream[i]->hangup_flush_done &&
2701 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2702 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2703 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
4078b776 2704 DBG("fd %d is hup|err|nval. Attempting flush and read.",
28ab034a 2705 pollfd[i].fd);
4078b776
MD
2706 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2707 /* Attempt read again, for the data we just flushed. */
c715ddc9 2708 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
4078b776
MD
2709 }
2710 /*
c715ddc9
JG
2711 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2712 * performed. This type of flush ensures that a new packet is produced no
2713 * matter the consumed/produced positions are.
2714 *
2715 * This, in turn, causes the next pass to see that data available for the
2716 * stream. When we come back here, we can be assured that all available
2717 * data has been consumed and we can finally destroy the stream.
2718 *
4078b776
MD
2719 * If the poll flag is HUP/ERR/NVAL and we have
2720 * read no data in this pass, we can remove the
2721 * stream from its hash table.
2722 */
2723 if ((pollfd[i].revents & POLLHUP)) {
2724 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
c715ddc9 2725 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2726 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2727 local_stream[i] = nullptr;
4078b776
MD
2728 }
2729 } else if (pollfd[i].revents & POLLERR) {
2730 ERR("Error returned in polling fd %d.", pollfd[i].fd);
c715ddc9 2731 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2732 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2733 local_stream[i] = nullptr;
4078b776
MD
2734 }
2735 } else if (pollfd[i].revents & POLLNVAL) {
2736 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
c715ddc9 2737 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2738 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2739 local_stream[i] = nullptr;
3bd1e081
MD
2740 }
2741 }
cd9adb8b 2742 if (local_stream[i] != nullptr) {
c715ddc9 2743 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
9617607b 2744 }
3bd1e081
MD
2745 }
2746 }
1fc79fb4
MD
2747 /* All is OK */
2748 err = 0;
3bd1e081
MD
2749end:
2750 DBG("polling thread exiting");
0e428499
DG
2751 free(pollfd);
2752 free(local_stream);
fb3a43a9
DG
2753
2754 /*
2755 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2756 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2757 * read side of the pipe. If we close them both, epoll_wait strangely does
2758 * not return and could create a endless wait period if the pipe is the
2759 * only tracked fd in the poll set. The thread will take care of closing
2760 * the read side.
fb3a43a9 2761 */
13886d2d 2762 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2763
2d57de81 2764error_testpoint:
1fc79fb4
MD
2765 if (err) {
2766 health_error();
2767 ERR("Health error occurred in %s", __func__);
2768 }
2769 health_unregister(health_consumerd);
2770
e7b994a3 2771 rcu_unregister_thread();
cd9adb8b 2772 return nullptr;
3bd1e081
MD
2773}
2774
d8ef542d
MD
2775/*
2776 * Close wake-up end of each stream belonging to the channel. This will
2777 * allow the poll() on the stream read-side to detect when the
2778 * write-side (application) finally closes them.
2779 */
28ab034a 2780static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
d8ef542d 2781{
3d46ea1a
JG
2782 const auto ht = the_consumer_data.stream_per_chan_id_ht;
2783
2784 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
2785 lttng_consumer_stream,
2786 decltype(lttng_consumer_stream::node_channel_id),
2787 &lttng_consumer_stream::node_channel_id,
2788 std::uint64_t>(*ht->ht,
2789 &channel->key,
2790 ht->hash_fct(&channel->key, lttng_ht_seed),
2791 ht->match_fct)) {
f2ad556d
MD
2792 /*
2793 * Protect against teardown with mutex.
2794 */
2795 pthread_mutex_lock(&stream->lock);
2796 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2797 goto next;
2798 }
fa29bfbf 2799 switch (the_consumer_data.type) {
d8ef542d
MD
2800 case LTTNG_CONSUMER_KERNEL:
2801 break;
2802 case LTTNG_CONSUMER32_UST:
2803 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2804 if (stream->metadata_flag) {
2805 /* Safe and protected by the stream lock. */
2806 lttng_ustconsumer_close_metadata(stream->chan);
2807 } else {
2808 /*
2809 * Note: a mutex is taken internally within
2810 * liblttng-ust-ctl to protect timer wakeup_fd
2811 * use from concurrent close.
2812 */
2813 lttng_ustconsumer_close_stream_wakeup(stream);
2814 }
d8ef542d
MD
2815 break;
2816 default:
2817 ERR("Unknown consumer_data type");
a0377dfe 2818 abort();
d8ef542d 2819 }
f2ad556d
MD
2820 next:
2821 pthread_mutex_unlock(&stream->lock);
d8ef542d 2822 }
d8ef542d
MD
2823}
2824
2825static void destroy_channel_ht(struct lttng_ht *ht)
2826{
cd9adb8b 2827 if (ht == nullptr) {
d8ef542d
MD
2828 return;
2829 }
2830
c3ade133
JG
2831 for (auto *channel :
2832 lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
2833 decltype(lttng_consumer_channel::wait_fd_node),
2834 &lttng_consumer_channel::wait_fd_node>(*ht->ht)) {
2835 const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
2836 LTTNG_ASSERT(ret != 0);
d8ef542d 2837 }
d8ef542d
MD
2838
2839 lttng_ht_destroy(ht);
2840}
2841
2842/*
2843 * This thread polls the channel fds to detect when they are being
2844 * closed. It closes all related streams if the channel is detected as
2845 * closed. It is currently only used as a shim layer for UST because the
2846 * consumerd needs to keep the per-stream wakeup end of pipes open for
2847 * periodical flush.
2848 */
2849void *consumer_thread_channel_poll(void *data)
2850{
1fc79fb4 2851 int ret, i, pollfd, err = -1;
d8ef542d 2852 uint32_t revents, nb_fd;
cd9adb8b 2853 struct lttng_consumer_channel *chan = nullptr;
d8ef542d
MD
2854 struct lttng_ht_iter iter;
2855 struct lttng_ht_node_u64 *node;
2856 struct lttng_poll_event events;
97535efa 2857 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
d8ef542d
MD
2858 struct lttng_ht *channel_ht;
2859
2860 rcu_register_thread();
2861
1fc79fb4
MD
2862 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2863
2d57de81
MD
2864 if (testpoint(consumerd_thread_channel)) {
2865 goto error_testpoint;
2866 }
2867
9ce5646a
MD
2868 health_code_update();
2869
d8ef542d
MD
2870 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2871 if (!channel_ht) {
2872 /* ENOMEM at this point. Better to bail out. */
2873 goto end_ht;
2874 }
2875
2876 DBG("Thread channel poll started");
2877
2878 /* Size is set to 1 for the consumer_channel pipe */
2879 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2880 if (ret < 0) {
2881 ERR("Poll set creation failed");
2882 goto end_poll;
2883 }
2884
2885 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2886 if (ret < 0) {
2887 goto end;
2888 }
2889
2890 /* Main loop */
2891 DBG("Channel main loop started");
2892
cd9adb8b 2893 while (true) {
28ab034a 2894 restart:
7fa2082e
MD
2895 health_code_update();
2896 DBG("Channel poll wait");
9ce5646a 2897 health_poll_entry();
d8ef542d 2898 ret = lttng_poll_wait(&events, -1);
28ab034a 2899 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2900 health_poll_exit();
40063ead 2901 DBG("Channel event caught in thread");
d8ef542d
MD
2902 if (ret < 0) {
2903 if (errno == EINTR) {
40063ead 2904 ERR("Poll EINTR caught");
d8ef542d
MD
2905 goto restart;
2906 }
d9607cd7 2907 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2908 err = 0; /* All is OK */
d9607cd7 2909 }
d8ef542d
MD
2910 goto end;
2911 }
2912
2913 nb_fd = ret;
2914
2915 /* From here, the event is a channel wait fd */
2916 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2917 health_code_update();
2918
d8ef542d
MD
2919 revents = LTTNG_POLL_GETEV(&events, i);
2920 pollfd = LTTNG_POLL_GETFD(&events, i);
2921
d8ef542d 2922 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 2923 if (revents & LPOLLIN) {
d8ef542d 2924 enum consumer_channel_action action;
a0cbdd2e 2925 uint64_t key;
d8ef542d 2926
a0cbdd2e 2927 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 2928 if (ret <= 0) {
03e43155
MD
2929 if (ret < 0) {
2930 ERR("Error reading channel pipe");
2931 }
28ab034a
JG
2932 lttng_poll_del(&events,
2933 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
2934 continue;
2935 }
2936
2937 switch (action) {
2938 case CONSUMER_CHANNEL_ADD:
56047f5a 2939 {
28ab034a 2940 DBG("Adding channel %d to poll set", chan->wait_fd);
d8ef542d
MD
2941
2942 lttng_ht_node_init_u64(&chan->wait_fd_node,
28ab034a 2943 chan->wait_fd);
07c4863f 2944 const lttng::urcu::read_lock_guard read_lock;
d8ef542d 2945 lttng_ht_add_unique_u64(channel_ht,
28ab034a 2946 &chan->wait_fd_node);
d8ef542d 2947 /* Add channel to the global poll events list */
28ab034a
JG
2948 // FIXME: Empty flag on a pipe pollset, this might
2949 // hang on FreeBSD.
1524f98c 2950 lttng_poll_add(&events, chan->wait_fd, 0);
d8ef542d 2951 break;
56047f5a 2952 }
a0cbdd2e
MD
2953 case CONSUMER_CHANNEL_DEL:
2954 {
b4a650f3 2955 /*
28ab034a
JG
2956 * This command should never be called if the
2957 * channel has streams monitored by either the data
2958 * or metadata thread. The consumer only notify this
2959 * thread with a channel del. command if it receives
2960 * a destroy channel command from the session daemon
2961 * that send it if a command prior to the
2962 * GET_CHANNEL failed.
b4a650f3
DG
2963 */
2964
07c4863f 2965 const lttng::urcu::read_lock_guard read_lock;
a0cbdd2e
MD
2966 chan = consumer_find_channel(key);
2967 if (!chan) {
28ab034a
JG
2968 ERR("UST consumer get channel key %" PRIu64
2969 " not found for del channel",
2970 key);
a0cbdd2e
MD
2971 break;
2972 }
2973 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 2974 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e 2975 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 2976 LTTNG_ASSERT(ret == 0);
a0cbdd2e 2977
fa29bfbf 2978 switch (the_consumer_data.type) {
f2a444f1
DG
2979 case LTTNG_CONSUMER_KERNEL:
2980 break;
2981 case LTTNG_CONSUMER32_UST:
2982 case LTTNG_CONSUMER64_UST:
212d67a2 2983 health_code_update();
28ab034a
JG
2984 /* Destroy streams that might have been left
2985 * in the stream list. */
212d67a2 2986 clean_channel_stream_list(chan);
f2a444f1
DG
2987 break;
2988 default:
2989 ERR("Unknown consumer_data type");
a0377dfe 2990 abort();
f2a444f1
DG
2991 }
2992
a0cbdd2e 2993 /*
28ab034a
JG
2994 * Release our own refcount. Force channel deletion
2995 * even if streams were not initialized.
a0cbdd2e
MD
2996 */
2997 if (!uatomic_sub_return(&chan->refcount, 1)) {
2998 consumer_del_channel(chan);
2999 }
3000 goto restart;
3001 }
d8ef542d
MD
3002 case CONSUMER_CHANNEL_QUIT:
3003 /*
28ab034a
JG
3004 * Remove the pipe from the poll set and continue
3005 * the loop since their might be data to consume.
d8ef542d 3006 */
28ab034a
JG
3007 lttng_poll_del(&events,
3008 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3009 continue;
3010 default:
3011 ERR("Unknown action");
3012 break;
3013 }
03e43155
MD
3014 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3015 DBG("Channel thread pipe hung up");
3016 /*
3017 * Remove the pipe from the poll set and continue the loop
3018 * since their might be data to consume.
3019 */
3020 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3021 continue;
3022 } else {
28ab034a
JG
3023 ERR("Unexpected poll events %u for sock %d",
3024 revents,
3025 pollfd);
03e43155 3026 goto end;
d8ef542d
MD
3027 }
3028
3029 /* Handle other stream */
3030 continue;
3031 }
3032
07c4863f 3033 const lttng::urcu::read_lock_guard read_lock;
d8ef542d
MD
3034 {
3035 uint64_t tmp_id = (uint64_t) pollfd;
3036
3037 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3038 }
00d7d903 3039 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
a0377dfe 3040 LTTNG_ASSERT(node);
d8ef542d 3041
c3ade133
JG
3042 chan = lttng::utils::container_of(node,
3043 &lttng_consumer_channel::wait_fd_node);
d8ef542d
MD
3044
3045 /* Check for error event */
3046 if (revents & (LPOLLERR | LPOLLHUP)) {
3047 DBG("Channel fd %d is hup|err.", pollfd);
3048
3049 lttng_poll_del(&events, chan->wait_fd);
3050 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3051 LTTNG_ASSERT(ret == 0);
b4a650f3
DG
3052
3053 /*
3054 * This will close the wait fd for each stream associated to
3055 * this channel AND monitored by the data/metadata thread thus
3056 * will be clean by the right thread.
3057 */
d8ef542d 3058 consumer_close_channel_streams(chan);
f2ad556d
MD
3059
3060 /* Release our own refcount */
28ab034a
JG
3061 if (!uatomic_sub_return(&chan->refcount, 1) &&
3062 !uatomic_read(&chan->nb_init_stream_left)) {
f2ad556d
MD
3063 consumer_del_channel(chan);
3064 }
03e43155
MD
3065 } else {
3066 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 3067 goto end;
d8ef542d
MD
3068 }
3069
3070 /* Release RCU lock for the channel looked up */
d8ef542d
MD
3071 }
3072 }
3073
1fc79fb4
MD
3074 /* All is OK */
3075 err = 0;
d8ef542d
MD
3076end:
3077 lttng_poll_clean(&events);
3078end_poll:
3079 destroy_channel_ht(channel_ht);
3080end_ht:
2d57de81 3081error_testpoint:
d8ef542d 3082 DBG("Channel poll thread exiting");
1fc79fb4
MD
3083 if (err) {
3084 health_error();
3085 ERR("Health error occurred in %s", __func__);
3086 }
3087 health_unregister(health_consumerd);
d8ef542d 3088 rcu_unregister_thread();
cd9adb8b 3089 return nullptr;
d8ef542d
MD
3090}
3091
331744e3 3092static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3093 struct pollfd *sockpoll,
3094 int client_socket)
331744e3
JD
3095{
3096 int ret;
3097
a0377dfe
FD
3098 LTTNG_ASSERT(ctx);
3099 LTTNG_ASSERT(sockpoll);
331744e3 3100
84382d49
MD
3101 ret = lttng_consumer_poll_socket(sockpoll);
3102 if (ret) {
331744e3
JD
3103 goto error;
3104 }
3105 DBG("Metadata connection on client_socket");
3106
3107 /* Blocking call, waiting for transmission */
3108 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3109 if (ctx->consumer_metadata_socket < 0) {
3110 WARN("On accept metadata");
3111 ret = -1;
3112 goto error;
3113 }
3114 ret = 0;
3115
3116error:
3117 return ret;
3118}
3119
3bd1e081
MD
3120/*
3121 * This thread listens on the consumerd socket and receives the file
3122 * descriptors from the session daemon.
3123 */
7d980def 3124void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3125{
1fc79fb4 3126 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3127 /*
3128 * structure to poll for incoming data on communication socket avoids
3129 * making blocking sockets.
3130 */
3131 struct pollfd consumer_sockpoll[2];
97535efa 3132 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3bd1e081 3133
e7b994a3
DG
3134 rcu_register_thread();
3135
1fc79fb4
MD
3136 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3137
2d57de81
MD
3138 if (testpoint(consumerd_thread_sessiond)) {
3139 goto error_testpoint;
3140 }
3141
9ce5646a
MD
3142 health_code_update();
3143
3bd1e081
MD
3144 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3145 unlink(ctx->consumer_command_sock_path);
3146 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3147 if (client_socket < 0) {
3148 ERR("Cannot create command socket");
3149 goto end;
3150 }
3151
3152 ret = lttcomm_listen_unix_sock(client_socket);
3153 if (ret < 0) {
3154 goto end;
3155 }
3156
32258573 3157 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3158 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3159 /* return < 0 on error, but == 0 is not fatal */
3160 if (ret < 0) {
32258573 3161 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3162 goto end;
3163 }
3164
3bd1e081
MD
3165 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3166 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3167 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3168 consumer_sockpoll[1].fd = client_socket;
3169 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3170
84382d49
MD
3171 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3172 if (ret) {
3173 if (ret > 0) {
3174 /* should exit */
3175 err = 0;
3176 }
3bd1e081
MD
3177 goto end;
3178 }
3179 DBG("Connection on client_socket");
3180
3181 /* Blocking call, waiting for transmission */
3182 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3183 if (sock < 0) {
3bd1e081
MD
3184 WARN("On accept");
3185 goto end;
3186 }
3bd1e081 3187
331744e3
JD
3188 /*
3189 * Setup metadata socket which is the second socket connection on the
3190 * command unix socket.
3191 */
3192 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3193 if (ret) {
3194 if (ret > 0) {
3195 /* should exit */
3196 err = 0;
3197 }
331744e3
JD
3198 goto end;
3199 }
3200
d96f09c6
DG
3201 /* This socket is not useful anymore. */
3202 ret = close(client_socket);
3203 if (ret < 0) {
3204 PERROR("close client_socket");
3205 }
3206 client_socket = -1;
3207
3bd1e081
MD
3208 /* update the polling structure to poll on the established socket */
3209 consumer_sockpoll[1].fd = sock;
3210 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3211
cd9adb8b 3212 while (true) {
9ce5646a
MD
3213 health_code_update();
3214
3215 health_poll_entry();
3216 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3217 health_poll_exit();
84382d49
MD
3218 if (ret) {
3219 if (ret > 0) {
3220 /* should exit */
3221 err = 0;
3222 }
3bd1e081
MD
3223 goto end;
3224 }
3225 DBG("Incoming command on sock");
3226 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3227 if (ret <= 0) {
3228 /*
3229 * This could simply be a session daemon quitting. Don't output
3230 * ERR() here.
3231 */
3232 DBG("Communication interrupted on command socket");
41ba6035 3233 err = 0;
3bd1e081
MD
3234 goto end;
3235 }
10211f5c 3236 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3237 DBG("consumer_thread_receive_fds received quit from signal");
28ab034a 3238 err = 0; /* All is OK */
3bd1e081
MD
3239 goto end;
3240 }
a1ed855a 3241 DBG("Received command on sock");
3bd1e081 3242 }
1fc79fb4
MD
3243 /* All is OK */
3244 err = 0;
3245
3bd1e081 3246end:
ffe60014 3247 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3248
d88aee68
DG
3249 /*
3250 * Close metadata streams since the producer is the session daemon which
3251 * just died.
3252 *
3253 * NOTE: for now, this only applies to the UST tracer.
3254 */
6d574024 3255 lttng_consumer_close_all_metadata();
d88aee68 3256
3bd1e081
MD
3257 /*
3258 * when all fds have hung up, the polling thread
3259 * can exit cleanly
3260 */
10211f5c 3261 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3262
04fdd819 3263 /*
c869f647 3264 * Notify the data poll thread to poll back again and test the
8994307f 3265 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3266 */
acdb9057 3267 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3268
cd9adb8b 3269 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3270
5c635c72
MD
3271 notify_health_quit_pipe(health_quit_pipe);
3272
d96f09c6
DG
3273 /* Cleaning up possibly open sockets. */
3274 if (sock >= 0) {
3275 ret = close(sock);
3276 if (ret < 0) {
3277 PERROR("close sock sessiond poll");
3278 }
3279 }
3280 if (client_socket >= 0) {
38476d24 3281 ret = close(client_socket);
d96f09c6
DG
3282 if (ret < 0) {
3283 PERROR("close client_socket sessiond poll");
3284 }
3285 }
3286
2d57de81 3287error_testpoint:
1fc79fb4
MD
3288 if (err) {
3289 health_error();
3290 ERR("Health error occurred in %s", __func__);
3291 }
3292 health_unregister(health_consumerd);
3293
e7b994a3 3294 rcu_unregister_thread();
cd9adb8b 3295 return nullptr;
3bd1e081 3296}
d41f73b7 3297
503fefca 3298static int post_consume(struct lttng_consumer_stream *stream,
28ab034a
JG
3299 const struct stream_subbuffer *subbuffer,
3300 struct lttng_consumer_local_data *ctx)
f96af312 3301{
503fefca 3302 size_t i;
f96af312 3303 int ret = 0;
28ab034a
JG
3304 const size_t count =
3305 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
f96af312 3306
503fefca
JG
3307 for (i = 0; i < count; i++) {
3308 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
28ab034a 3309 &stream->read_subbuffer_ops.post_consume_cbs, i);
503fefca
JG
3310
3311 ret = op(stream, subbuffer, ctx);
3312 if (ret) {
3313 goto end;
f96af312 3314 }
f96af312 3315 }
f96af312
JG
3316end:
3317 return ret;
3318}
3319
4078b776 3320ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
28ab034a
JG
3321 struct lttng_consumer_local_data *ctx,
3322 bool locked_by_caller)
d41f73b7 3323{
12bddd1d 3324 ssize_t ret, written_bytes = 0;
23d56598 3325 int rotation_ret;
6f9449c2 3326 struct stream_subbuffer subbuffer = {};
b6797c8e 3327 enum get_next_subbuffer_status get_next_status;
74251bb8 3328
6f9449c2
JG
3329 if (!locked_by_caller) {
3330 stream->read_subbuffer_ops.lock(stream);
947bd097
JR
3331 } else {
3332 stream->read_subbuffer_ops.assert_locked(stream);
6f9449c2
JG
3333 }
3334
3335 if (stream->read_subbuffer_ops.on_wake_up) {
3336 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3337 if (ret) {
3338 goto end;
3339 }
94d49140 3340 }
74251bb8 3341
23d56598
JG
3342 /*
3343 * If the stream was flagged to be ready for rotation before we extract
3344 * the next packet, rotate it now.
3345 */
3346 if (stream->rotate_ready) {
3347 DBG("Rotate stream before consuming data");
f46376a1 3348 ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3349 if (ret < 0) {
3350 ERR("Stream rotation error before consuming data");
3351 goto end;
3352 }
3353 }
3354
28ab034a 3355 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
b6797c8e
JG
3356 switch (get_next_status) {
3357 case GET_NEXT_SUBBUFFER_STATUS_OK:
3358 break;
3359 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3360 /* Not an error. */
3361 ret = 0;
3362 goto sleep_stream;
3363 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3364 ret = -1;
6f9449c2 3365 goto end;
b6797c8e
JG
3366 default:
3367 abort();
d41f73b7 3368 }
74251bb8 3369
28ab034a 3370 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
6f9449c2
JG
3371 if (ret) {
3372 goto error_put_subbuf;
3373 }
3374
28ab034a 3375 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
514775d9
FD
3376 if (written_bytes <= 0) {
3377 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3378 ret = (int) written_bytes;
3379 goto error_put_subbuf;
6f9449c2
JG
3380 }
3381
3382 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3383 if (ret) {
23d56598
JG
3384 goto end;
3385 }
3386
503fefca
JG
3387 ret = post_consume(stream, &subbuffer, ctx);
3388 if (ret) {
3389 goto end;
6f9449c2
JG
3390 }
3391
23d56598
JG
3392 /*
3393 * After extracting the packet, we check if the stream is now ready to
3394 * be rotated and perform the action immediately.
3395 *
3396 * Don't overwrite `ret` as callers expect the number of bytes
3397 * consumed to be returned on success.
3398 */
3399 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3400 if (rotation_ret == 1) {
f46376a1 3401 rotation_ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3402 if (rotation_ret < 0) {
3403 ret = rotation_ret;
3404 ERR("Stream rotation error after consuming data");
3405 goto end;
3406 }
503fefca 3407
23d56598
JG
3408 } else if (rotation_ret < 0) {
3409 ret = rotation_ret;
3410 ERR("Failed to check if stream was ready to rotate after consuming data");
3411 goto end;
3412 }
3413
82e72193 3414sleep_stream:
6f9449c2
JG
3415 if (stream->read_subbuffer_ops.on_sleep) {
3416 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3417 }
3418
3419 ret = written_bytes;
23d56598 3420end:
6f9449c2
JG
3421 if (!locked_by_caller) {
3422 stream->read_subbuffer_ops.unlock(stream);
94d49140 3423 }
6f9449c2 3424
74251bb8 3425 return ret;
6f9449c2
JG
3426error_put_subbuf:
3427 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3428 goto end;
d41f73b7
MD
3429}
3430
3431int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3432{
fa29bfbf 3433 switch (the_consumer_data.type) {
d41f73b7
MD
3434 case LTTNG_CONSUMER_KERNEL:
3435 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3436 case LTTNG_CONSUMER32_UST:
3437 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3438 return lttng_ustconsumer_on_recv_stream(stream);
3439 default:
3440 ERR("Unknown consumer_data type");
a0377dfe 3441 abort();
d41f73b7
MD
3442 return -ENOSYS;
3443 }
3444}
e4421fec
DG
3445
3446/*
3447 * Allocate and set consumer data hash tables.
3448 */
cd9adb8b 3449int lttng_consumer_init()
e4421fec 3450{
fa29bfbf
SM
3451 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3452 if (!the_consumer_data.channel_ht) {
282dadbc
MD
3453 goto error;
3454 }
3455
28ab034a 3456 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3457 if (!the_consumer_data.channels_by_session_id_ht) {
5c3892a6
JG
3458 goto error;
3459 }
3460
fa29bfbf
SM
3461 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3462 if (!the_consumer_data.relayd_ht) {
282dadbc
MD
3463 goto error;
3464 }
3465
fa29bfbf
SM
3466 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3467 if (!the_consumer_data.stream_list_ht) {
282dadbc
MD
3468 goto error;
3469 }
3470
28ab034a 3471 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3472 if (!the_consumer_data.stream_per_chan_id_ht) {
282dadbc
MD
3473 goto error;
3474 }
3475
3476 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3477 if (!data_ht) {
3478 goto error;
3479 }
3480
3481 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3482 if (!metadata_ht) {
3483 goto error;
3484 }
3485
fa29bfbf
SM
3486 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3487 if (!the_consumer_data.chunk_registry) {
28cc88f3
JG
3488 goto error;
3489 }
3490
282dadbc
MD
3491 return 0;
3492
3493error:
3494 return -1;
e4421fec 3495}
7735ef9e
DG
3496
3497/*
3498 * Process the ADD_RELAYD command receive by a consumer.
3499 *
3500 * This will create a relayd socket pair and add it to the relayd hash table.
3501 * The caller MUST acquire a RCU read side lock before calling it.
3502 */
4222116f 3503void consumer_add_relayd_socket(uint64_t net_seq_idx,
28ab034a
JG
3504 int sock_type,
3505 struct lttng_consumer_local_data *ctx,
3506 int sock,
3507 struct pollfd *consumer_sockpoll,
3508 uint64_t sessiond_id,
3509 uint64_t relayd_session_id,
3510 uint32_t relayd_version_major,
3511 uint32_t relayd_version_minor,
3512 enum lttcomm_sock_proto relayd_socket_protocol)
7735ef9e 3513{
cd2b09ed 3514 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3515 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 3516 struct consumer_relayd_sock_pair *relayd = nullptr;
7735ef9e 3517
a0377dfe 3518 LTTNG_ASSERT(ctx);
4222116f 3519 LTTNG_ASSERT(sock >= 0);
48b7cdc2 3520 ASSERT_RCU_READ_LOCKED();
6151a90f 3521
da009f2c 3522 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3523
3524 /* Get relayd reference if exists. */
3525 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 3526 if (relayd == nullptr) {
a0377dfe 3527 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3528 /* Not found. Allocate one. */
3529 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
cd9adb8b 3530 if (relayd == nullptr) {
618a6a28
MD
3531 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3532 goto error;
0d08d75e 3533 } else {
30319bcb 3534 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3535 relayd_created = 1;
7735ef9e 3536 }
0d08d75e
DG
3537
3538 /*
3539 * This code path MUST continue to the consumer send status message to
3540 * we can notify the session daemon and continue our work without
3541 * killing everything.
3542 */
da009f2c
MD
3543 } else {
3544 /*
3545 * relayd key should never be found for control socket.
3546 */
a0377dfe 3547 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3548 }
3549
3550 /* First send a status message before receiving the fds. */
0c759fc9 3551 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3552 if (ret < 0) {
0d08d75e 3553 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3554 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3555 goto error_nosignal;
7735ef9e
DG
3556 }
3557
3558 /* Poll on consumer socket. */
84382d49
MD
3559 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3560 if (ret) {
3561 /* Needing to exit in the middle of a command: error. */
0d08d75e 3562 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3563 goto error_nosignal;
7735ef9e
DG
3564 }
3565
3566 /* Get relayd socket from session daemon */
3567 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3568 if (ret != sizeof(fd)) {
28ab034a 3569 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3570
3571 /*
3572 * Failing to receive FDs might indicate a major problem such as
3573 * reaching a fd limit during the receive where the kernel returns a
3574 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3575 * don't take any chances and stop everything.
3576 *
3577 * XXX: Feature request #558 will fix that and avoid this possible
3578 * issue when reaching the fd limit.
3579 */
3580 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3581 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3582 goto error;
3583 }
3584
7735ef9e
DG
3585 /* Copy socket information and received FD */
3586 switch (sock_type) {
3587 case LTTNG_STREAM_CONTROL:
3588 /* Copy received lttcomm socket */
4222116f 3589 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3590 &relayd->control_sock.sock, fd, relayd_socket_protocol);
7735ef9e 3591
6151a90f 3592 /* Assign version values. */
4222116f
JR
3593 relayd->control_sock.major = relayd_version_major;
3594 relayd->control_sock.minor = relayd_version_minor;
c5b6f4f0 3595
d3e2ba59 3596 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3597
7735ef9e
DG
3598 break;
3599 case LTTNG_STREAM_DATA:
3600 /* Copy received lttcomm socket */
4222116f 3601 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3602 &relayd->data_sock.sock, fd, relayd_socket_protocol);
6151a90f 3603 /* Assign version values. */
4222116f
JR
3604 relayd->data_sock.major = relayd_version_major;
3605 relayd->data_sock.minor = relayd_version_minor;
7735ef9e
DG
3606 break;
3607 default:
3608 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3609 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3610 goto error;
3611 }
3612
4222116f
JR
3613 if (ret < 0) {
3614 ret_code = LTTCOMM_CONSUMERD_FATAL;
3615 goto error;
3616 }
3617
d88aee68 3618 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
28ab034a
JG
3619 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3620 relayd->net_seq_idx,
3621 fd);
39d9954c
FD
3622 /*
3623 * We gave the ownership of the fd to the relayd structure. Set the
3624 * fd to -1 so we don't call close() on it in the error path below.
3625 */
3626 fd = -1;
7735ef9e 3627
618a6a28
MD
3628 /* We successfully added the socket. Send status back. */
3629 ret = consumer_send_status_msg(sock, ret_code);
3630 if (ret < 0) {
3631 /* Somehow, the session daemon is not responding anymore. */
3632 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3633 goto error_nosignal;
3634 }
3635
7735ef9e
DG
3636 /*
3637 * Add relayd socket pair to consumer data hashtable. If object already
3638 * exists or on error, the function gracefully returns.
3639 */
9276e5c8 3640 relayd->ctx = ctx;
d09e1200 3641 add_relayd(relayd);
7735ef9e
DG
3642
3643 /* All good! */
2527bf85 3644 return;
7735ef9e
DG
3645
3646error:
618a6a28
MD
3647 if (consumer_send_status_msg(sock, ret_code) < 0) {
3648 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3649 }
3650
3651error_nosignal:
4028eeb9
DG
3652 /* Close received socket if valid. */
3653 if (fd >= 0) {
3654 if (close(fd)) {
3655 PERROR("close received socket");
3656 }
3657 }
cd2b09ed
DG
3658
3659 if (relayd_created) {
cd2b09ed
DG
3660 free(relayd);
3661 }
7735ef9e 3662}
ca22feea 3663
f7079f67
DG
3664/*
3665 * Search for a relayd associated to the session id and return the reference.
3666 *
3667 * A rcu read side lock MUST be acquire before calling this function and locked
3668 * until the relayd object is no longer necessary.
3669 */
3670static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3671{
f7079f67 3672 /* Iterate over all relayd since they are indexed by net_seq_idx. */
c3ade133
JG
3673 for (auto *relayd :
3674 lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
3675 decltype(consumer_relayd_sock_pair::node),
3676 &consumer_relayd_sock_pair::node>(
3677 *the_consumer_data.relayd_ht->ht)) {
18261bd1
DG
3678 /*
3679 * Check by sessiond id which is unique here where the relayd session
3680 * id might not be when having multiple relayd.
3681 */
3682 if (relayd->sessiond_session_id == id) {
f7079f67 3683 /* Found the relayd. There can be only one per id. */
c3ade133 3684 return relayd;
f7079f67
DG
3685 }
3686 }
3687
cd9adb8b 3688 return nullptr;
f7079f67
DG
3689}
3690
ca22feea
DG
3691/*
3692 * Check if for a given session id there is still data needed to be extract
3693 * from the buffers.
3694 *
6d805429 3695 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3696 */
6d805429 3697int consumer_data_pending(uint64_t id)
ca22feea
DG
3698{
3699 int ret;
3d46ea1a 3700 const auto ht = the_consumer_data.stream_list_ht;
cd9adb8b 3701 struct consumer_relayd_sock_pair *relayd = nullptr;
6d805429 3702 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3703
6d805429 3704 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3705
3d46ea1a 3706 const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
ca22feea 3707
fa29bfbf 3708 switch (the_consumer_data.type) {
ca22feea 3709 case LTTNG_CONSUMER_KERNEL:
6d805429 3710 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3711 break;
3712 case LTTNG_CONSUMER32_UST:
3713 case LTTNG_CONSUMER64_UST:
6d805429 3714 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3715 break;
3716 default:
3717 ERR("Unknown consumer data type");
a0377dfe 3718 abort();
ca22feea
DG
3719 }
3720
3d46ea1a
JG
3721 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
3722 lttng_consumer_stream,
3723 decltype(lttng_consumer_stream::node_session_id),
3724 &lttng_consumer_stream::node_session_id,
3725 std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
3726 const lttng::pthread::lock_guard stream_lock(stream->lock);
ca22feea 3727
4e9a4686
DG
3728 /*
3729 * A removed node from the hash table indicates that the stream has
3730 * been deleted thus having a guarantee that the buffers are closed
3731 * on the consumer side. However, data can still be transmitted
3732 * over the network so don't skip the relayd check.
3733 */
3734 ret = cds_lfht_is_node_deleted(&stream->node.node);
3735 if (!ret) {
3736 /* Check the stream if there is data in the buffers. */
6d805429
DG
3737 ret = data_pending(stream);
3738 if (ret == 1) {
f7079f67 3739 goto data_pending;
4e9a4686
DG
3740 }
3741 }
d9f0c7c7
JR
3742 }
3743
3744 relayd = find_relayd_by_session_id(id);
3745 if (relayd) {
3746 unsigned int is_data_inflight = 0;
3747
3d46ea1a
JG
3748 const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
3749
d9f0c7c7 3750 /* Send init command for data pending. */
28ab034a 3751 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
d9f0c7c7 3752 if (ret < 0) {
d9f0c7c7
JR
3753 /* Communication error thus the relayd so no data pending. */
3754 goto data_not_pending;
3755 }
3756
3d46ea1a
JG
3757 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
3758 lttng_consumer_stream,
3759 decltype(lttng_consumer_stream::node_session_id),
3760 &lttng_consumer_stream::node_session_id,
3761 std::uint64_t>(
3762 *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
c8f59ee5 3763 if (stream->metadata_flag) {
ad7051c0 3764 ret = relayd_quiescent_control(&relayd->control_sock,
28ab034a 3765 stream->relayd_stream_id);
c8f59ee5 3766 } else {
6d805429 3767 ret = relayd_data_pending(&relayd->control_sock,
28ab034a
JG
3768 stream->relayd_stream_id,
3769 stream->next_net_seq_num - 1);
c8f59ee5 3770 }
d9f0c7c7
JR
3771
3772 if (ret == 1) {
d9f0c7c7
JR
3773 goto data_pending;
3774 } else if (ret < 0) {
28ab034a
JG
3775 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3776 relayd->net_seq_idx);
9276e5c8 3777 lttng_consumer_cleanup_relayd(relayd);
9276e5c8
JR
3778 goto data_not_pending;
3779 }
c8f59ee5 3780 }
f7079f67 3781
d9f0c7c7 3782 /* Send end command for data pending. */
28ab034a
JG
3783 ret = relayd_end_data_pending(
3784 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
bdd88757 3785 if (ret < 0) {
28ab034a
JG
3786 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3787 relayd->net_seq_idx);
9276e5c8 3788 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3789 goto data_not_pending;
3790 }
bdd88757
DG
3791 if (is_data_inflight) {
3792 goto data_pending;
3793 }
f7079f67
DG
3794 }
3795
ca22feea 3796 /*
f7079f67
DG
3797 * Finding _no_ node in the hash table and no inflight data means that the
3798 * stream(s) have been removed thus data is guaranteed to be available for
3799 * analysis from the trace files.
ca22feea
DG
3800 */
3801
f7079f67 3802data_not_pending:
ca22feea 3803 /* Data is available to be read by a viewer. */
6d805429 3804 return 0;
ca22feea 3805
f7079f67 3806data_pending:
ca22feea 3807 /* Data is still being extracted from buffers. */
6d805429 3808 return 1;
ca22feea 3809}
f50f23d9
DG
3810
3811/*
3812 * Send a ret code status message to the sessiond daemon.
3813 *
3814 * Return the sendmsg() return value.
3815 */
3816int consumer_send_status_msg(int sock, int ret_code)
3817{
3818 struct lttcomm_consumer_status_msg msg;
3819
53efb85a 3820 memset(&msg, 0, sizeof(msg));
97535efa 3821 msg.ret_code = (lttcomm_return_code) ret_code;
f50f23d9
DG
3822
3823 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3824}
ffe60014
DG
3825
3826/*
3827 * Send a channel status message to the sessiond daemon.
3828 *
3829 * Return the sendmsg() return value.
3830 */
28ab034a 3831int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
ffe60014
DG
3832{
3833 struct lttcomm_consumer_status_channel msg;
3834
a0377dfe 3835 LTTNG_ASSERT(sock >= 0);
ffe60014 3836
53efb85a 3837 memset(&msg, 0, sizeof(msg));
ffe60014 3838 if (!channel) {
0c759fc9 3839 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3840 } else {
0c759fc9 3841 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3842 msg.key = channel->key;
3843 msg.stream_count = channel->streams.count;
3844 }
3845
3846 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3847}
5c786ded 3848
d07ceecd 3849unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
28ab034a
JG
3850 unsigned long produced_pos,
3851 uint64_t nb_packets_per_stream,
3852 uint64_t max_sb_size)
5c786ded 3853{
d07ceecd 3854 unsigned long start_pos;
5c786ded 3855
d07ceecd 3856 if (!nb_packets_per_stream) {
28ab034a 3857 return consumed_pos; /* Grab everything */
d07ceecd 3858 }
1cbd136b 3859 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
d07ceecd
MD
3860 start_pos -= max_sb_size * nb_packets_per_stream;
3861 if ((long) (start_pos - consumed_pos) < 0) {
28ab034a 3862 return consumed_pos; /* Grab everything */
d07ceecd
MD
3863 }
3864 return start_pos;
5c786ded 3865}
a1ae2ea5 3866
c1dcb8bb
JG
3867/* Stream lock must be held by the caller. */
3868static int sample_stream_positions(struct lttng_consumer_stream *stream,
28ab034a
JG
3869 unsigned long *produced,
3870 unsigned long *consumed)
c1dcb8bb
JG
3871{
3872 int ret;
3873
3874 ASSERT_LOCKED(stream->lock);
3875
3876 ret = lttng_consumer_sample_snapshot_positions(stream);
3877 if (ret < 0) {
3878 ERR("Failed to sample snapshot positions");
3879 goto end;
3880 }
3881
3882 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3883 if (ret < 0) {
3884 ERR("Failed to sample produced position");
3885 goto end;
3886 }
3887
3888 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3889 if (ret < 0) {
3890 ERR("Failed to sample consumed position");
3891 goto end;
3892 }
3893
3894end:
3895 return ret;
3896}
3897
b99a8d42
JD
3898/*
3899 * Sample the rotate position for all the streams of a channel. If a stream
3900 * is already at the rotate position (produced == consumed), we flag it as
3901 * ready for rotation. The rotation of ready streams occurs after we have
3902 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3903 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3904 *
3905 * Returns 0 on success, < 0 on error
3906 */
92b7a7f8 3907int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
3908 uint64_t key,
3909 uint64_t relayd_id)
b99a8d42
JD
3910{
3911 int ret;
3d46ea1a 3912 const auto ht = the_consumer_data.stream_per_chan_id_ht;
c35f9726
JG
3913 struct lttng_dynamic_array stream_rotation_positions;
3914 uint64_t next_chunk_id, stream_count = 0;
3915 enum lttng_trace_chunk_status chunk_status;
3916 const bool is_local_trace = relayd_id == -1ULL;
cd9adb8b 3917 struct consumer_relayd_sock_pair *relayd = nullptr;
c35f9726 3918 bool rotating_to_new_chunk = true;
b32703d6
JG
3919 /* Array of `struct lttng_consumer_stream *` */
3920 struct lttng_dynamic_pointer_array streams_packet_to_open;
b99a8d42 3921
48b7cdc2
FD
3922 ASSERT_RCU_READ_LOCKED();
3923
b99a8d42
JD
3924 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3925
cd9adb8b
JG
3926 lttng_dynamic_array_init(&stream_rotation_positions,
3927 sizeof(struct relayd_stream_rotation_position),
3928 nullptr);
3929 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
c35f9726 3930
3d46ea1a 3931 const lttng::pthread::lock_guard channel_lock(channel->lock);
b99a8d42 3932
a0377dfe 3933 LTTNG_ASSERT(channel->trace_chunk);
28ab034a 3934 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
c35f9726
JG
3935 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
3936 ret = -1;
3d46ea1a 3937 goto end;
c35f9726 3938 }
b99a8d42 3939
3d46ea1a
JG
3940 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
3941 lttng_consumer_stream,
3942 decltype(lttng_consumer_stream::node_channel_id),
3943 &lttng_consumer_stream::node_channel_id,
3944 std::uint64_t>(*ht->ht,
3945 &channel->key,
3946 ht->hash_fct(&channel->key, lttng_ht_seed),
3947 ht->match_fct)) {
a40a503f 3948 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
3949
3950 health_code_update();
3951
3952 /*
3953 * Lock stream because we are about to change its state.
3954 */
3d46ea1a 3955 const lttng::pthread::lock_guard stream_lock(stream->lock);
b99a8d42 3956
c35f9726
JG
3957 if (stream->trace_chunk == stream->chan->trace_chunk) {
3958 rotating_to_new_chunk = false;
3959 }
3960
a40a503f 3961 /*
c1dcb8bb 3962 * Do not flush a packet when rotating from a NULL trace
a9dde553 3963 * chunk. The stream has no means to output data, and the prior
c1dcb8bb
JG
3964 * rotation which rotated to NULL performed that side-effect
3965 * already. No new data can be produced when a stream has no
3966 * associated trace chunk (e.g. a stop followed by a rotate).
a40a503f 3967 */
a9dde553 3968 if (stream->trace_chunk) {
c1dcb8bb
JG
3969 bool flush_active;
3970
3971 if (stream->metadata_flag) {
3972 /*
3973 * Don't produce an empty metadata packet,
3974 * simply close the current one.
3975 *
3976 * Metadata is regenerated on every trace chunk
3977 * switch; there is no concern that no data was
3978 * produced.
3979 */
3980 flush_active = true;
3981 } else {
3982 /*
3983 * Only flush an empty packet if the "packet
3984 * open" could not be performed on transition
3985 * to a new trace chunk and no packets were
3986 * consumed within the chunk's lifetime.
3987 */
3988 if (stream->opened_packet_in_current_trace_chunk) {
3989 flush_active = true;
3990 } else {
3991 /*
3992 * Stream could have been full at the
3993 * time of rotation, but then have had
3994 * no activity at all.
3995 *
3996 * It is important to flush a packet
3997 * to prevent 0-length files from being
3998 * produced as most viewers choke on
3999 * them.
4000 *
4001 * Unfortunately viewers will not be
4002 * able to know that tracing was active
4003 * for this stream during this trace
4004 * chunk's lifetime.
4005 */
28ab034a
JG
4006 ret = sample_stream_positions(
4007 stream, &produced_pos, &consumed_pos);
c1dcb8bb 4008 if (ret) {
3d46ea1a 4009 goto end;
c1dcb8bb
JG
4010 }
4011
4012 /*
4013 * Don't flush an empty packet if data
4014 * was produced; it will be consumed
4015 * before the rotation completes.
4016 */
4017 flush_active = produced_pos != consumed_pos;
4018 if (!flush_active) {
c1dcb8bb
JG
4019 const char *trace_chunk_name;
4020 uint64_t trace_chunk_id;
4021
4022 chunk_status = lttng_trace_chunk_get_name(
28ab034a
JG
4023 stream->trace_chunk,
4024 &trace_chunk_name,
cd9adb8b 4025 nullptr);
c1dcb8bb
JG
4026 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4027 trace_chunk_name = "none";
4028 }
4029
4030 /*
4031 * Consumer trace chunks are
4032 * never anonymous.
4033 */
4034 chunk_status = lttng_trace_chunk_get_id(
28ab034a 4035 stream->trace_chunk, &trace_chunk_id);
a0377dfe 4036 LTTNG_ASSERT(chunk_status ==
28ab034a 4037 LTTNG_TRACE_CHUNK_STATUS_OK);
c1dcb8bb
JG
4038
4039 DBG("Unable to open packet for stream during trace chunk's lifetime. "
28ab034a
JG
4040 "Flushing an empty packet to prevent an empty file from being created: "
4041 "stream id = %" PRIu64
4042 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4043 stream->key,
4044 trace_chunk_name,
4045 trace_chunk_id);
c1dcb8bb
JG
4046 }
4047 }
4048 }
4049
a9dde553 4050 /*
c1dcb8bb
JG
4051 * Close the current packet before sampling the
4052 * ring buffer positions.
a9dde553 4053 */
c1dcb8bb 4054 ret = consumer_stream_flush_buffer(stream, flush_active);
a9dde553
MD
4055 if (ret < 0) {
4056 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
28ab034a 4057 stream->key);
3d46ea1a 4058 goto end;
a9dde553 4059 }
b99a8d42
JD
4060 }
4061
a40a503f
MD
4062 ret = lttng_consumer_take_snapshot(stream);
4063 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4064 ERR("Failed to sample snapshot position during channel rotation");
3d46ea1a 4065 goto end;
b99a8d42 4066 }
a40a503f 4067 if (!ret) {
28ab034a 4068 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
a40a503f
MD
4069 if (ret < 0) {
4070 ERR("Failed to sample produced position during channel rotation");
3d46ea1a 4071 goto end;
a40a503f 4072 }
b99a8d42 4073
28ab034a 4074 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
a40a503f
MD
4075 if (ret < 0) {
4076 ERR("Failed to sample consumed position during channel rotation");
3d46ea1a 4077 goto end;
a40a503f
MD
4078 }
4079 }
4080 /*
4081 * Align produced position on the start-of-packet boundary of the first
4082 * packet going into the next trace chunk.
4083 */
1cbd136b 4084 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
a40a503f 4085 if (consumed_pos == produced_pos) {
f8528c7a 4086 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
28ab034a
JG
4087 stream->key,
4088 produced_pos,
4089 consumed_pos);
b99a8d42 4090 stream->rotate_ready = true;
f8528c7a
MD
4091 } else {
4092 DBG("Different consumed and produced positions "
28ab034a
JG
4093 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4094 stream->key,
4095 produced_pos,
4096 consumed_pos);
b99a8d42 4097 }
633d0182 4098 /*
a40a503f
MD
4099 * The rotation position is based on the packet_seq_num of the
4100 * packet following the last packet that was consumed for this
4101 * stream, incremented by the offset between produced and
4102 * consumed positions. This rotation position is a lower bound
4103 * (inclusive) at which the next trace chunk starts. Since it
4104 * is a lower bound, it is OK if the packet_seq_num does not
4105 * correspond exactly to the same packet identified by the
4106 * consumed_pos, which can happen in overwrite mode.
633d0182 4107 */
a40a503f
MD
4108 if (stream->sequence_number_unavailable) {
4109 /*
4110 * Rotation should never be performed on a session which
4111 * interacts with a pre-2.8 lttng-modules, which does
4112 * not implement packet sequence number.
4113 */
4114 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
28ab034a 4115 stream->key);
a40a503f 4116 ret = -1;
3d46ea1a 4117 goto end;
b99a8d42 4118 }
a40a503f 4119 stream->rotate_position = stream->last_sequence_number + 1 +
28ab034a 4120 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a 4121 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
28ab034a
JG
4122 stream->key,
4123 stream->rotate_position);
b99a8d42 4124
c35f9726 4125 if (!is_local_trace) {
633d0182
JG
4126 /*
4127 * The relay daemon control protocol expects a rotation
4128 * position as "the sequence number of the first packet
a40a503f 4129 * _after_ the current trace chunk".
633d0182 4130 */
c35f9726
JG
4131 const struct relayd_stream_rotation_position position = {
4132 .stream_id = stream->relayd_stream_id,
a40a503f 4133 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4134 };
4135
28ab034a
JG
4136 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4137 &position);
c35f9726
JG
4138 if (ret) {
4139 ERR("Failed to allocate stream rotation position");
3d46ea1a 4140 goto end;
c35f9726
JG
4141 }
4142 stream_count++;
4143 }
f96af312
JG
4144
4145 stream->opened_packet_in_current_trace_chunk = false;
4146
4147 if (rotating_to_new_chunk && !stream->metadata_flag) {
4148 /*
4149 * Attempt to flush an empty packet as close to the
4150 * rotation point as possible. In the event where a
4151 * stream remains inactive after the rotation point,
4152 * this ensures that the new trace chunk has a
4153 * beginning timestamp set at the begining of the
4154 * trace chunk instead of only creating an empty
4155 * packet when the trace chunk is stopped.
4156 *
4157 * This indicates to the viewers that the stream
4158 * was being recorded, but more importantly it
4159 * allows viewers to determine a useable trace
4160 * intersection.
4161 *
4162 * This presents a problem in the case where the
4163 * ring-buffer is completely full.
4164 *
4165 * Consider the following scenario:
4166 * - The consumption of data is slow (slow network,
4167 * for instance),
4168 * - The ring buffer is full,
4169 * - A rotation is initiated,
4170 * - The flush below does nothing (no space left to
4171 * open a new packet),
4172 * - The other streams rotate very soon, and new
4173 * data is produced in the new chunk,
4174 * - This stream completes its rotation long after the
4175 * rotation was initiated
4176 * - The session is stopped before any event can be
4177 * produced in this stream's buffers.
4178 *
4179 * The resulting trace chunk will have a single packet
4180 * temporaly at the end of the trace chunk for this
4181 * stream making the stream intersection more narrow
4182 * than it should be.
4183 *
4184 * To work-around this, an empty flush is performed
4185 * after the first consumption of a packet during a
4186 * rotation if open_packet fails. The idea is that
4187 * consuming a packet frees enough space to switch
4188 * packets in this scenario and allows the tracer to
4189 * "stamp" the beginning of the new trace chunk at the
4190 * earliest possible point.
b32703d6
JG
4191 *
4192 * The packet open is performed after the channel
4193 * rotation to ensure that no attempt to open a packet
4194 * is performed in a stream that has no active trace
4195 * chunk.
f96af312 4196 */
28ab034a
JG
4197 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4198 stream);
b32703d6
JG
4199 if (ret) {
4200 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
f96af312 4201 ret = -1;
3d46ea1a 4202 goto end;
f96af312
JG
4203 }
4204 }
b99a8d42 4205 }
b99a8d42 4206
b32703d6
JG
4207 if (!is_local_trace) {
4208 relayd = consumer_find_relayd(relayd_id);
4209 if (!relayd) {
4210 ERR("Failed to find relayd %" PRIu64, relayd_id);
4211 ret = -1;
3d46ea1a 4212 goto end;
b32703d6 4213 }
c35f9726 4214
b32703d6 4215 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
4216 ret = relayd_rotate_streams(&relayd->control_sock,
4217 stream_count,
cd9adb8b 4218 rotating_to_new_chunk ? &next_chunk_id : nullptr,
28ab034a
JG
4219 (const struct relayd_stream_rotation_position *)
4220 stream_rotation_positions.buffer.data);
b32703d6
JG
4221 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4222 if (ret < 0) {
4223 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
28ab034a 4224 relayd->net_seq_idx);
b32703d6 4225 lttng_consumer_cleanup_relayd(relayd);
3d46ea1a 4226 goto end;
b32703d6 4227 }
c35f9726
JG
4228 }
4229
3d46ea1a 4230 for (std::size_t stream_idx = 0;
28ab034a
JG
4231 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4232 stream_idx++) {
b32703d6 4233 enum consumer_stream_open_packet_status status;
3d46ea1a 4234 auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
28ab034a 4235 &streams_packet_to_open, stream_idx);
b32703d6 4236
3d46ea1a 4237 const lttng::pthread::lock_guard stream_lock(stream->lock);
b32703d6 4238 status = consumer_stream_open_packet(stream);
b32703d6
JG
4239 switch (status) {
4240 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4241 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4242 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4243 stream->key,
4244 stream->chan->name,
4245 stream->chan->session_id);
b32703d6
JG
4246 break;
4247 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4248 /*
4249 * Can't open a packet as there is no space left
4250 * in the buffer. A new packet will be opened
4251 * once one has been consumed.
4252 */
4253 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4254 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4255 stream->key,
4256 stream->chan->name,
4257 stream->chan->session_id);
b32703d6
JG
4258 break;
4259 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4260 /* Logged by callee. */
4261 ret = -1;
3d46ea1a 4262 goto end;
b32703d6
JG
4263 default:
4264 abort();
4265 }
c35f9726
JG
4266 }
4267
b99a8d42 4268 ret = 0;
b99a8d42 4269end:
c35f9726 4270 lttng_dynamic_array_reset(&stream_rotation_positions);
b32703d6 4271 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
b99a8d42
JD
4272 return ret;
4273}
4274
28ab034a 4275static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4276{
4277 int ret = 0;
4278 unsigned long consumed_pos_before, consumed_pos_after;
4279
4280 ret = lttng_consumer_sample_snapshot_positions(stream);
4281 if (ret < 0) {
4282 ERR("Taking snapshot positions");
4283 goto end;
4284 }
4285
4286 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4287 if (ret < 0) {
4288 ERR("Consumed snapshot position");
4289 goto end;
4290 }
4291
fa29bfbf 4292 switch (the_consumer_data.type) {
5f3aff8b
MD
4293 case LTTNG_CONSUMER_KERNEL:
4294 ret = kernctl_buffer_clear(stream->wait_fd);
4295 if (ret < 0) {
96393977 4296 ERR("Failed to clear kernel stream (ret = %d)", ret);
5f3aff8b
MD
4297 goto end;
4298 }
4299 break;
4300 case LTTNG_CONSUMER32_UST:
4301 case LTTNG_CONSUMER64_UST:
881fc67f
MD
4302 ret = lttng_ustconsumer_clear_buffer(stream);
4303 if (ret < 0) {
4304 ERR("Failed to clear ust stream (ret = %d)", ret);
4305 goto end;
4306 }
5f3aff8b
MD
4307 break;
4308 default:
4309 ERR("Unknown consumer_data type");
4310 abort();
4311 }
4312
4313 ret = lttng_consumer_sample_snapshot_positions(stream);
4314 if (ret < 0) {
4315 ERR("Taking snapshot positions");
4316 goto end;
4317 }
4318 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4319 if (ret < 0) {
4320 ERR("Consumed snapshot position");
4321 goto end;
4322 }
4323 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4324end:
4325 return ret;
4326}
4327
28ab034a 4328static int consumer_clear_stream(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4329{
4330 int ret;
4331
cd9adb8b 4332 ret = consumer_stream_flush_buffer(stream, true);
5f3aff8b 4333 if (ret < 0) {
28ab034a 4334 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4335 ret = LTTCOMM_CONSUMERD_FATAL;
4336 goto error;
4337 }
4338
4339 ret = consumer_clear_buffer(stream);
4340 if (ret < 0) {
28ab034a 4341 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4342 ret = LTTCOMM_CONSUMERD_FATAL;
4343 goto error;
4344 }
4345
4346 ret = LTTCOMM_CONSUMERD_SUCCESS;
4347error:
4348 return ret;
4349}
4350
28ab034a 4351static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b 4352{
07c4863f 4353 const lttng::urcu::read_lock_guard read_lock;
a1a1df65
JG
4354 const lttng::pthread::lock_guard channel_lock(channel->lock);
4355
4356 for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
4357 &lttng_consumer_stream::send_node>(
4358 channel->streams.head)) {
5f3aff8b 4359 health_code_update();
a1a1df65
JG
4360
4361 const lttng::pthread::lock_guard stream_lock(stream->lock);
4362 const auto ret = consumer_clear_stream(stream);
5f3aff8b 4363 if (ret) {
a1a1df65 4364 return ret;
5f3aff8b 4365 }
5f3aff8b 4366 }
5f3aff8b 4367
a1a1df65 4368 return 0;
5f3aff8b
MD
4369}
4370
02d02e31
JD
4371/*
4372 * Check if a stream is ready to be rotated after extracting it.
4373 *
4374 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4375 * error. Stream lock must be held.
4376 */
4377int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4378{
28ab034a
JG
4379 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4380 " last_sequence_number %" PRIu64,
4381 stream->key,
4382 stream->rotate_ready,
4383 stream->rotate_position,
4384 stream->last_sequence_number);
02d02e31 4385 if (stream->rotate_ready) {
a40a503f 4386 return 1;
02d02e31
JD
4387 }
4388
4389 /*
a40a503f
MD
4390 * If packet seq num is unavailable, it means we are interacting
4391 * with a pre-2.8 lttng-modules which does not implement the
4392 * sequence number. Rotation should never be used by sessiond in this
4393 * scenario.
02d02e31 4394 */
a40a503f
MD
4395 if (stream->sequence_number_unavailable) {
4396 ERR("Internal error: rotation used on stream %" PRIu64
28ab034a
JG
4397 " with unavailable sequence number",
4398 stream->key);
a40a503f 4399 return -1;
02d02e31
JD
4400 }
4401
28ab034a 4402 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
a40a503f 4403 return 0;
02d02e31
JD
4404 }
4405
a40a503f
MD
4406 /*
4407 * Rotate position not reached yet. The stream rotate position is
4408 * the position of the next packet belonging to the next trace chunk,
4409 * but consumerd considers rotation ready when reaching the last
4410 * packet of the current chunk, hence the "rotate_position - 1".
4411 */
f8528c7a 4412
28ab034a
JG
4413 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4414 " rotate_position %" PRIu64,
4415 stream->key,
4416 stream->last_sequence_number,
4417 stream->rotate_position);
a40a503f
MD
4418 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4419 return 1;
02d02e31 4420 }
02d02e31 4421
a40a503f 4422 return 0;
02d02e31
JD
4423}
4424
d73bf3d7
JD
4425/*
4426 * Reset the state for a stream after a rotation occurred.
4427 */
4428void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4429{
28ab034a 4430 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
a40a503f 4431 stream->rotate_position = -1ULL;
d73bf3d7
JD
4432 stream->rotate_ready = false;
4433}
4434
4435/*
4436 * Perform the rotation a local stream file.
4437 */
28ab034a 4438static int rotate_local_stream(struct lttng_consumer_stream *stream)
d73bf3d7 4439{
d2956687 4440 int ret = 0;
d73bf3d7 4441
d2956687 4442 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
28ab034a
JG
4443 stream->key,
4444 stream->chan->key);
d73bf3d7 4445 stream->tracefile_size_current = 0;
d2956687 4446 stream->tracefile_count_current = 0;
d73bf3d7 4447
d2956687
JG
4448 if (stream->out_fd >= 0) {
4449 ret = close(stream->out_fd);
4450 if (ret) {
4451 PERROR("Failed to close stream out_fd of channel \"%s\"",
28ab034a 4452 stream->chan->name);
d2956687
JG
4453 }
4454 stream->out_fd = -1;
4455 }
d73bf3d7 4456
d2956687 4457 if (stream->index_file) {
d73bf3d7 4458 lttng_index_file_put(stream->index_file);
cd9adb8b 4459 stream->index_file = nullptr;
d73bf3d7
JD
4460 }
4461
d2956687
JG
4462 if (!stream->trace_chunk) {
4463 goto end;
4464 }
d73bf3d7 4465
d2956687 4466 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4467end:
4468 return ret;
d73bf3d7
JD
4469}
4470
d73bf3d7
JD
4471/*
4472 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4473 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4474 *
4475 * Return 0 on success, a negative number of error.
4476 */
f46376a1 4477int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
d73bf3d7
JD
4478{
4479 int ret;
4480
4481 DBG("Consumer rotate stream %" PRIu64, stream->key);
4482
d2956687
JG
4483 /*
4484 * Update the stream's 'current' chunk to the session's (channel)
4485 * now-current chunk.
4486 */
4487 lttng_trace_chunk_put(stream->trace_chunk);
4488 if (stream->chan->trace_chunk == stream->trace_chunk) {
4489 /*
4490 * A channel can be rotated and not have a "next" chunk
4491 * to transition to. In that case, the channel's "current chunk"
4492 * has not been closed yet, but it has not been updated to
4493 * a "next" trace chunk either. Hence, the stream, like its
4494 * parent channel, becomes part of no chunk and can't output
4495 * anything until a new trace chunk is created.
4496 */
cd9adb8b 4497 stream->trace_chunk = nullptr;
28ab034a 4498 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
d2956687
JG
4499 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4500 ret = -1;
4501 goto error;
4502 } else {
4503 /*
4504 * Update the stream's trace chunk to its parent channel's
4505 * current trace chunk.
4506 */
4507 stream->trace_chunk = stream->chan->trace_chunk;
4508 }
4509
c35f9726 4510 if (stream->net_seq_idx == (uint64_t) -1ULL) {
f46376a1 4511 ret = rotate_local_stream(stream);
c35f9726
JG
4512 if (ret < 0) {
4513 ERR("Failed to rotate stream, ret = %i", ret);
4514 goto error;
4515 }
d73bf3d7
JD
4516 }
4517
d2956687
JG
4518 if (stream->metadata_flag && stream->trace_chunk) {
4519 /*
4520 * If the stream has transitioned to a new trace
4521 * chunk, the metadata should be re-dumped to the
4522 * newest chunk.
4523 *
4524 * However, it is possible for a stream to transition to
4525 * a "no-chunk" state. This can happen if a rotation
4526 * occurs on an inactive session. In such cases, the metadata
4527 * regeneration will happen when the next trace chunk is
4528 * created.
4529 */
4530 ret = consumer_metadata_stream_dump(stream);
4531 if (ret) {
4532 goto error;
d73bf3d7
JD
4533 }
4534 }
4535 lttng_consumer_reset_stream_rotate_state(stream);
4536
4537 ret = 0;
4538
4539error:
4540 return ret;
4541}
4542
b99a8d42
JD
4543/*
4544 * Rotate all the ready streams now.
4545 *
4546 * This is especially important for low throughput streams that have already
4547 * been consumed, we cannot wait for their next packet to perform the
4548 * rotation.
92b7a7f8
MD
4549 * Need to be called with RCU read-side lock held to ensure existence of
4550 * channel.
b99a8d42
JD
4551 *
4552 * Returns 0 on success, < 0 on error
4553 */
28ab034a 4554int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
b99a8d42
JD
4555{
4556 int ret;
3d46ea1a 4557 const auto ht = the_consumer_data.stream_per_chan_id_ht;
b99a8d42 4558
48b7cdc2
FD
4559 ASSERT_RCU_READ_LOCKED();
4560
b99a8d42
JD
4561 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4562
3d46ea1a
JG
4563 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
4564 lttng_consumer_stream,
4565 decltype(lttng_consumer_stream::node_channel_id),
4566 &lttng_consumer_stream::node_channel_id,
4567 std::uint64_t>(*ht->ht,
4568 &channel->key,
4569 ht->hash_fct(&channel->key, lttng_ht_seed),
4570 ht->match_fct)) {
b99a8d42
JD
4571 health_code_update();
4572
3d46ea1a
JG
4573 const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
4574 const lttng::pthread::lock_guard stream_lock(stream->lock);
b99a8d42
JD
4575
4576 if (!stream->rotate_ready) {
b99a8d42
JD
4577 continue;
4578 }
b99a8d42 4579
3d46ea1a 4580 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
f46376a1 4581 ret = lttng_consumer_rotate_stream(stream);
b99a8d42
JD
4582 if (ret) {
4583 goto end;
4584 }
4585 }
4586
4587 ret = 0;
4588
4589end:
b99a8d42
JD
4590 return ret;
4591}
4592
28ab034a
JG
4593enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4594 const lttng_uuid& sessiond_uuid)
00fb02ac 4595{
d2956687 4596 enum lttcomm_return_code ret;
c70636a7 4597 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4598
d2956687
JG
4599 if (ctx->sessiond_uuid.is_set) {
4600 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4601 goto end;
4602 }
4603
d2956687 4604 ctx->sessiond_uuid.is_set = true;
328c2fe7 4605 ctx->sessiond_uuid.value = sessiond_uuid;
d2956687
JG
4606 ret = LTTCOMM_CONSUMERD_SUCCESS;
4607 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4608 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4609end:
4610 return ret;
4611}
4612
28ab034a
JG
4613enum lttcomm_return_code
4614lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4615 uint64_t session_id,
4616 uint64_t chunk_id,
4617 time_t chunk_creation_timestamp,
4618 const char *chunk_override_name,
4619 const struct lttng_credentials *credentials,
4620 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4621{
4622 int ret;
d2956687 4623 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 4624 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
d2956687
JG
4625 enum lttng_trace_chunk_status chunk_status;
4626 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4627 char creation_timestamp_buffer[ISO8601_STR_LEN];
4628 const char *relayd_id_str = "(none)";
4629 const char *creation_timestamp_str;
92816cc3 4630
d2956687
JG
4631 if (relayd_id) {
4632 /* Only used for logging purposes. */
28ab034a 4633 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4634 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4635 relayd_id_str = relayd_id_buffer;
4636 } else {
4637 relayd_id_str = "(formatting error)";
4638 }
d01ef216 4639 }
d2956687 4640
d01ef216 4641 /* Local protocol error. */
a0377dfe 4642 LTTNG_ASSERT(chunk_creation_timestamp);
d2956687 4643 ret = time_to_iso8601_str(chunk_creation_timestamp,
28ab034a
JG
4644 creation_timestamp_buffer,
4645 sizeof(creation_timestamp_buffer));
4646 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
4647
4648 DBG("Consumer create trace chunk command: relay_id = %s"
28ab034a
JG
4649 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4650 ", chunk_creation_timestamp = %s",
4651 relayd_id_str,
4652 session_id,
4653 chunk_id,
4654 chunk_override_name ?: "(none)",
4655 creation_timestamp_str);
92816cc3
JG
4656
4657 /*
d2956687
JG
4658 * The trace chunk registry, as used by the consumer daemon, implicitly
4659 * owns the trace chunks. This is only needed in the consumer since
4660 * the consumer has no notion of a session beyond session IDs being
4661 * used to identify other objects.
4662 *
4663 * The lttng_trace_chunk_registry_publish() call below provides a
4664 * reference which is not released; it implicitly becomes the session
4665 * daemon's reference to the chunk in the consumer daemon.
4666 *
4667 * The lifetime of trace chunks in the consumer daemon is managed by
4668 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4669 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4670 */
cd9adb8b 4671 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
d2956687
JG
4672 if (!created_chunk) {
4673 ERR("Failed to create trace chunk");
4674 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4675 goto error;
d2956687 4676 }
92816cc3 4677
d2956687 4678 if (chunk_override_name) {
28ab034a 4679 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
d2956687
JG
4680 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4681 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4682 goto error;
92816cc3
JG
4683 }
4684 }
4685
d2956687 4686 if (chunk_directory_handle) {
28ab034a 4687 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
d2956687
JG
4688 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4689 ERR("Failed to set trace chunk credentials");
4690 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4691 goto error;
d2956687
JG
4692 }
4693 /*
4694 * The consumer daemon has no ownership of the chunk output
4695 * directory.
4696 */
28ab034a 4697 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
cd9adb8b 4698 chunk_directory_handle = nullptr;
d2956687
JG
4699 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4700 ERR("Failed to set trace chunk's directory handle");
4701 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4702 goto error;
92816cc3
JG
4703 }
4704 }
4705
d2956687 4706 published_chunk = lttng_trace_chunk_registry_publish_chunk(
28ab034a 4707 the_consumer_data.chunk_registry, session_id, created_chunk);
d2956687 4708 lttng_trace_chunk_put(created_chunk);
cd9adb8b 4709 created_chunk = nullptr;
d2956687
JG
4710 if (!published_chunk) {
4711 ERR("Failed to publish trace chunk");
4712 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4713 goto error;
d88744a4
JD
4714 }
4715
3d46ea1a
JG
4716 for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
4717 lttng_consumer_channel,
4718 decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
4719 &lttng_consumer_channel::channels_by_session_id_ht_node,
4720 std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
4721 &session_id,
4722 the_consumer_data.channels_by_session_id_ht->hash_fct(
4723 &session_id, lttng_ht_seed),
4724 the_consumer_data.channels_by_session_id_ht->match_fct)) {
4725 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4726 if (ret) {
4727 /*
4728 * Roll-back the creation of this chunk.
4729 *
4730 * This is important since the session daemon will
4731 * assume that the creation of this chunk failed and
4732 * will never ask for it to be closed, resulting
4733 * in a leak and an inconsistent state for some
4734 * channels.
4735 */
4736 enum lttcomm_return_code close_ret;
4737 char path[LTTNG_PATH_MAX];
d2956687 4738
3d46ea1a
JG
4739 DBG("Failed to set new trace chunk on existing channels, rolling back");
4740 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4741 session_id,
4742 chunk_id,
4743 chunk_creation_timestamp,
4744 nullptr,
4745 path);
4746 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4747 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4748 ", chunk_id = %" PRIu64,
4749 session_id,
4750 chunk_id);
d2956687 4751 }
3d46ea1a
JG
4752
4753 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4754 break;
d2956687 4755 }
a1ae2ea5
JD
4756 }
4757
e5add6d0
JG
4758 if (relayd_id) {
4759 struct consumer_relayd_sock_pair *relayd;
4760
4761 relayd = consumer_find_relayd(*relayd_id);
4762 if (relayd) {
4763 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4764 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
e5add6d0
JG
4765 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4766 } else {
4767 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4768 }
4769
4770 if (!relayd || ret) {
4771 enum lttcomm_return_code close_ret;
ecd1a12f 4772 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4773
4774 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
28ab034a
JG
4775 session_id,
4776 chunk_id,
4777 chunk_creation_timestamp,
cd9adb8b 4778 nullptr,
28ab034a 4779 path);
e5add6d0 4780 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
28ab034a
JG
4781 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4782 ", chunk_id = %" PRIu64,
4783 session_id,
4784 chunk_id);
e5add6d0
JG
4785 }
4786
4787 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4788 goto error_unlock;
e5add6d0
JG
4789 }
4790 }
7ea24db3 4791error_unlock:
7ea24db3 4792error:
d2956687
JG
4793 /* Release the reference returned by the "publish" operation. */
4794 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4795 lttng_trace_chunk_put(created_chunk);
d2956687 4796 return ret_code;
a1ae2ea5
JD
4797}
4798
28ab034a
JG
4799enum lttcomm_return_code
4800lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4801 uint64_t session_id,
4802 uint64_t chunk_id,
4803 time_t chunk_close_timestamp,
4804 const enum lttng_trace_chunk_command_type *close_command,
4805 char *path)
a1ae2ea5 4806{
d2956687
JG
4807 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4808 struct lttng_trace_chunk *chunk;
4809 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4810 const char *relayd_id_str = "(none)";
bbc4768c 4811 const char *close_command_name = "none";
d2956687 4812 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4813
d2956687
JG
4814 if (relayd_id) {
4815 int ret;
4816
4817 /* Only used for logging purposes. */
28ab034a 4818 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4819 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4820 relayd_id_str = relayd_id_buffer;
4821 } else {
4822 relayd_id_str = "(formatting error)";
4823 }
bbc4768c
JG
4824 }
4825 if (close_command) {
28ab034a 4826 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
bbc4768c 4827 }
d2956687
JG
4828
4829 DBG("Consumer close trace chunk command: relayd_id = %s"
28ab034a
JG
4830 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4831 relayd_id_str,
4832 session_id,
4833 chunk_id,
4834 close_command_name);
bbc4768c 4835
d2956687 4836 chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 4837 the_consumer_data.chunk_registry, session_id, chunk_id);
bbc4768c 4838 if (!chunk) {
28ab034a
JG
4839 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4840 session_id,
4841 chunk_id);
d2956687 4842 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4843 goto end;
4844 }
4845
28ab034a 4846 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
d2956687
JG
4847 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4848 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4849 goto end;
45f1d9a1 4850 }
bbc4768c
JG
4851
4852 if (close_command) {
28ab034a 4853 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
bbc4768c
JG
4854 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4855 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4856 goto end;
4857 }
4858 }
a1ae2ea5 4859
d2956687
JG
4860 /*
4861 * chunk is now invalid to access as we no longer hold a reference to
4862 * it; it is only kept around to compare it (by address) to the
4863 * current chunk found in the session's channels.
4864 */
c3ade133
JG
4865 for (auto *channel :
4866 lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
4867 decltype(lttng_consumer_channel::node),
4868 &lttng_consumer_channel::node>(
4869 *the_consumer_data.channel_ht->ht)) {
4870 int ret;
a1ae2ea5 4871
c3ade133
JG
4872 /*
4873 * Only change the channel's chunk to NULL if it still
4874 * references the chunk being closed. The channel may
4875 * reference a newer channel in the case of a session
4876 * rotation. When a session rotation occurs, the "next"
4877 * chunk is created before the "current" chunk is closed.
4878 */
4879 if (channel->trace_chunk != chunk) {
4880 continue;
4881 }
4882 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4883 if (ret) {
d2956687 4884 /*
c3ade133
JG
4885 * Attempt to close the chunk on as many channels as
4886 * possible.
d2956687 4887 */
c3ade133 4888 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
d2956687 4889 }
a1ae2ea5 4890 }
c3ade133 4891
bbc4768c
JG
4892 if (relayd_id) {
4893 int ret;
4894 struct consumer_relayd_sock_pair *relayd;
4895
4896 relayd = consumer_find_relayd(*relayd_id);
4897 if (relayd) {
4898 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4899 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
bbc4768c
JG
4900 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4901 } else {
28ab034a 4902 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
bbc4768c
JG
4903 }
4904
4905 if (!relayd || ret) {
4906 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4907 goto error_unlock;
4908 }
4909 }
4910error_unlock:
d2956687 4911end:
bbc4768c
JG
4912 /*
4913 * Release the reference returned by the "find" operation and
4914 * the session daemon's implicit reference to the chunk.
4915 */
4916 lttng_trace_chunk_put(chunk);
4917 lttng_trace_chunk_put(chunk);
4918
d2956687 4919 return ret_code;
a1ae2ea5 4920}
3654ed19 4921
28ab034a
JG
4922enum lttcomm_return_code
4923lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
3654ed19 4924{
c35f9726 4925 int ret;
d2956687 4926 enum lttcomm_return_code ret_code;
d2956687
JG
4927 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4928 const char *relayd_id_str = "(none)";
c35f9726 4929 const bool is_local_trace = !relayd_id;
cd9adb8b 4930 struct consumer_relayd_sock_pair *relayd = nullptr;
6b584c2e 4931 bool chunk_exists_local, chunk_exists_remote;
07c4863f 4932 const lttng::urcu::read_lock_guard read_lock;
d2956687
JG
4933
4934 if (relayd_id) {
d2956687 4935 /* Only used for logging purposes. */
28ab034a 4936 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4937 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4938 relayd_id_str = relayd_id_buffer;
4939 } else {
4940 relayd_id_str = "(formatting error)";
4941 }
d01ef216 4942 }
d2956687
JG
4943
4944 DBG("Consumer trace chunk exists command: relayd_id = %s"
28ab034a
JG
4945 ", chunk_id = %" PRIu64,
4946 relayd_id_str,
4947 chunk_id);
6b584c2e 4948 ret = lttng_trace_chunk_registry_chunk_exists(
28ab034a 4949 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
6b584c2e
JG
4950 if (ret) {
4951 /* Internal error. */
4952 ERR("Failed to query the existence of a trace chunk");
4953 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 4954 goto end;
6b584c2e 4955 }
28ab034a 4956 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
6b584c2e 4957 if (chunk_exists_local) {
c35f9726 4958 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
4959 goto end;
4960 } else if (is_local_trace) {
4961 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4962 goto end;
4963 }
4964
c35f9726
JG
4965 relayd = consumer_find_relayd(*relayd_id);
4966 if (!relayd) {
4967 ERR("Failed to find relayd %" PRIu64, *relayd_id);
4968 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
4969 goto end_rcu_unlock;
4970 }
4971 DBG("Looking up existence of trace chunk on relay daemon");
4972 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4973 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
c35f9726
JG
4974 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4975 if (ret < 0) {
4976 ERR("Failed to look-up the existence of trace chunk on relay daemon");
4977 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
4978 goto end_rcu_unlock;
4979 }
4980
28ab034a
JG
4981 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
4982 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4983 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
d2956687 4984
c35f9726 4985end_rcu_unlock:
c35f9726 4986end:
d2956687 4987 return ret_code;
3654ed19 4988}
5f3aff8b 4989
28ab034a 4990static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b 4991{
5f3aff8b 4992 int ret;
3d46ea1a
JG
4993 const auto ht = the_consumer_data.stream_per_chan_id_ht;
4994
4995 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
4996 lttng_consumer_stream,
4997 decltype(lttng_consumer_stream::node_channel_id),
4998 &lttng_consumer_stream::node_channel_id,
4999 std::uint64_t>(*ht->ht,
5000 &channel->key,
5001 ht->hash_fct(&channel->key, lttng_ht_seed),
5002 ht->match_fct)) {
5f3aff8b
MD
5003 /*
5004 * Protect against teardown with mutex.
5005 */
3d46ea1a 5006 const lttng::pthread::lock_guard stream_lock(stream->lock);
5f3aff8b 5007 if (cds_lfht_is_node_deleted(&stream->node.node)) {
3d46ea1a 5008 continue;
5f3aff8b 5009 }
3d46ea1a 5010
5f3aff8b
MD
5011 ret = consumer_clear_stream(stream);
5012 if (ret) {
3d46ea1a 5013 return ret;
5f3aff8b 5014 }
5f3aff8b 5015 }
5f3aff8b 5016
3d46ea1a 5017 return LTTCOMM_CONSUMERD_SUCCESS;
5f3aff8b
MD
5018}
5019
5020int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5021{
5022 int ret;
5023
5024 DBG("Consumer clear channel %" PRIu64, channel->key);
5025
5026 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5027 /*
5028 * Nothing to do for the metadata channel/stream.
5029 * Snapshot mechanism already take care of the metadata
5030 * handling/generation, and monitored channels only need to
5031 * have their data stream cleared..
5032 */
5033 ret = LTTCOMM_CONSUMERD_SUCCESS;
5034 goto end;
5035 }
5036
5037 if (!channel->monitor) {
5038 ret = consumer_clear_unmonitored_channel(channel);
5039 } else {
5040 ret = consumer_clear_monitored_channel(channel);
5041 }
5042end:
5043 return ret;
5044}
04ed9e10 5045
28ab034a 5046enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
04ed9e10 5047{
04ed9e10
JG
5048 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5049
5050 if (channel->metadata_stream) {
5051 ERR("Open channel packets command attempted on a metadata channel");
a1a1df65 5052 return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
04ed9e10
JG
5053 }
5054
a1a1df65
JG
5055 const lttng::urcu::read_lock_guard read_lock;
5056 for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
5057 &lttng_consumer_stream::send_node>(
5058 channel->streams.head)) {
5059 enum consumer_stream_open_packet_status status;
04ed9e10 5060
a1a1df65
JG
5061 const lttng::pthread::lock_guard stream_lock(stream->lock);
5062 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5063 continue;
5064 }
04ed9e10 5065
a1a1df65
JG
5066 status = consumer_stream_open_packet(stream);
5067 switch (status) {
5068 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5069 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5070 ", channel name = %s, session id = %" PRIu64,
5071 stream->key,
5072 stream->chan->name,
5073 stream->chan->session_id);
5074 stream->opened_packet_in_current_trace_chunk = true;
5075 break;
5076 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5077 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5078 ", channel name = %s, session id = %" PRIu64,
5079 stream->key,
5080 stream->chan->name,
5081 stream->chan->session_id);
5082 break;
5083 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5084 /*
5085 * Only unexpected internal errors can lead to this
5086 * failing. Report an unknown error.
5087 */
5088 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5089 ", channel id = %" PRIu64 ", channel name = %s"
5090 ", session id = %" PRIu64,
5091 stream->key,
5092 channel->key,
5093 channel->name,
5094 channel->session_id);
5095 return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5096 default:
5097 abort();
56047f5a 5098 }
04ed9e10 5099 }
04ed9e10 5100
a1a1df65 5101 return ret;
04ed9e10 5102}
881fc67f
MD
5103
5104void lttng_consumer_sigbus_handle(void *addr)
5105{
5106 lttng_ustconsumer_sigbus_handle(addr);
5107}
This page took 0.493478 seconds and 4 git commands to generate.