Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / common / consumer / consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
c9e313bc
SM
11#include <common/align.hpp>
12#include <common/common.hpp>
13#include <common/compat/endian.hpp>
14#include <common/compat/poll.hpp>
15#include <common/consumer/consumer-metadata-cache.hpp>
16#include <common/consumer/consumer-stream.hpp>
17#include <common/consumer/consumer-testpoint.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/dynamic-array.hpp>
21#include <common/index/ctf-index.hpp>
22#include <common/index/index.hpp>
23#include <common/kernel-consumer/kernel-consumer.hpp>
24#include <common/kernel-ctl/kernel-ctl.hpp>
25#include <common/relayd/relayd.hpp>
26#include <common/sessiond-comm/relayd.hpp>
27#include <common/sessiond-comm/sessiond-comm.hpp>
28#include <common/string-utils/format.hpp>
29#include <common/time.hpp>
30#include <common/trace-chunk-registry.hpp>
31#include <common/trace-chunk.hpp>
56047f5a 32#include <common/urcu.hpp>
c9e313bc
SM
33#include <common/ust-consumer/ust-consumer.hpp>
34#include <common/utils.hpp>
3bd1e081 35
28ab034a
JG
36#include <bin/lttng-consumerd/health-consumerd.hpp>
37#include <inttypes.h>
38#include <poll.h>
39#include <pthread.h>
40#include <signal.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <unistd.h>
47
97535efa 48lttng_consumer_global_data the_consumer_data;
3bd1e081 49
d8ef542d
MD
50enum consumer_channel_action {
51 CONSUMER_CHANNEL_ADD,
a0cbdd2e 52 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
53 CONSUMER_CHANNEL_QUIT,
54};
55
f1494934 56namespace {
d8ef542d
MD
57struct consumer_channel_msg {
58 enum consumer_channel_action action;
28ab034a
JG
59 struct lttng_consumer_channel *chan; /* add */
60 uint64_t key; /* del */
d8ef542d
MD
61};
62
f1494934
JG
63/*
64 * Global hash table containing respectively metadata and data streams. The
65 * stream element in this ht should only be updated by the metadata poll thread
66 * for the metadata and the data poll thread for the data.
67 */
68struct lttng_ht *metadata_ht;
69struct lttng_ht *data_ht;
70} /* namespace */
71
80957876 72/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
73int data_consumption_paused;
74
3bd1e081
MD
75/*
76 * Flag to inform the polling thread to quit when all fd hung up. Updated by
77 * the consumer_thread_receive_fds when it notices that all fds has hung up.
78 * Also updated by the signal handler (consumer_should_exit()). Read by the
79 * polling threads.
80 */
10211f5c 81int consumer_quit;
3bd1e081 82
cd9adb8b 83static const char *get_consumer_domain()
5da88b0f 84{
fa29bfbf 85 switch (the_consumer_data.type) {
5da88b0f
MD
86 case LTTNG_CONSUMER_KERNEL:
87 return DEFAULT_KERNEL_TRACE_DIR;
88 case LTTNG_CONSUMER64_UST:
89 /* Fall-through. */
90 case LTTNG_CONSUMER32_UST:
91 return DEFAULT_UST_TRACE_DIR;
92 default:
93 abort();
94 }
95}
96
acdb9057
DG
97/*
98 * Notify a thread lttng pipe to poll back again. This usually means that some
99 * global state has changed so we just send back the thread in a poll wait
100 * call.
101 */
102static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
103{
cd9adb8b 104 struct lttng_consumer_stream *null_stream = nullptr;
acdb9057 105
a0377dfe 106 LTTNG_ASSERT(pipe);
acdb9057 107
5c7248cd
JG
108 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
109 pointer. */
acdb9057
DG
110}
111
5c635c72
MD
112static void notify_health_quit_pipe(int *pipe)
113{
6cd525e8 114 ssize_t ret;
5c635c72 115
6cd525e8
MD
116 ret = lttng_write(pipe[1], "4", 1);
117 if (ret < 1) {
5c635c72
MD
118 PERROR("write consumer health quit");
119 }
120}
121
d8ef542d 122static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
123 struct lttng_consumer_channel *chan,
124 uint64_t key,
125 enum consumer_channel_action action)
d8ef542d
MD
126{
127 struct consumer_channel_msg msg;
6cd525e8 128 ssize_t ret;
d8ef542d 129
e56251fc
DG
130 memset(&msg, 0, sizeof(msg));
131
d8ef542d
MD
132 msg.action = action;
133 msg.chan = chan;
f21dae48 134 msg.key = key;
6cd525e8
MD
135 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
136 if (ret < sizeof(msg)) {
137 PERROR("notify_channel_pipe write error");
138 }
d8ef542d
MD
139}
140
28ab034a 141void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
a0cbdd2e 142{
cd9adb8b 143 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
a0cbdd2e
MD
144}
145
d8ef542d 146static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
147 struct lttng_consumer_channel **chan,
148 uint64_t *key,
149 enum consumer_channel_action *action)
d8ef542d
MD
150{
151 struct consumer_channel_msg msg;
6cd525e8 152 ssize_t ret;
d8ef542d 153
6cd525e8
MD
154 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
155 if (ret < sizeof(msg)) {
156 ret = -1;
157 goto error;
d8ef542d 158 }
6cd525e8
MD
159 *action = msg.action;
160 *chan = msg.chan;
161 *key = msg.key;
162error:
163 return (int) ret;
d8ef542d
MD
164}
165
212d67a2
DG
166/*
167 * Cleanup the stream list of a channel. Those streams are not yet globally
168 * visible
169 */
170static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
171{
172 struct lttng_consumer_stream *stream, *stmp;
173
a0377dfe 174 LTTNG_ASSERT(channel);
212d67a2
DG
175
176 /* Delete streams that might have been left in the stream list. */
28ab034a 177 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
212d67a2
DG
178 /*
179 * Once a stream is added to this list, the buffers were created so we
180 * have a guarantee that this call will succeed. Setting the monitor
181 * mode to 0 so we don't lock nor try to delete the stream from the
182 * global hash table.
183 */
184 stream->monitor = 0;
cd9adb8b 185 consumer_stream_destroy(stream, nullptr);
212d67a2
DG
186 }
187}
188
3bd1e081
MD
189/*
190 * Find a stream. The consumer_data.lock must be locked during this
191 * call.
192 */
28ab034a 193static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
3bd1e081 194{
e4421fec 195 struct lttng_ht_iter iter;
d88aee68 196 struct lttng_ht_node_u64 *node;
cd9adb8b 197 struct lttng_consumer_stream *stream = nullptr;
3bd1e081 198
a0377dfe 199 LTTNG_ASSERT(ht);
8389e4f8 200
d88aee68
DG
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
cd9adb8b 203 return nullptr;
7a57cf92 204 }
e4421fec 205
56047f5a 206 lttng::urcu::read_lock_guard read_lock;
6065ceec 207
d88aee68
DG
208 lttng_ht_lookup(ht, &key, &iter);
209 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 210 if (node != nullptr) {
0114db0e 211 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
3bd1e081 212 }
e4421fec
DG
213
214 return stream;
3bd1e081
MD
215}
216
da009f2c 217static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
218{
219 struct lttng_consumer_stream *stream;
220
56047f5a 221 lttng::urcu::read_lock_guard read_lock;
ffe60014 222 stream = find_stream(key, ht);
04253271 223 if (stream) {
da009f2c 224 stream->key = (uint64_t) -1ULL;
04253271
MD
225 /*
226 * We don't want the lookup to match, but we still need
227 * to iterate on this stream when iterating over the hash table. Just
228 * change the node key.
229 */
da009f2c 230 stream->node.key = (uint64_t) -1ULL;
04253271 231 }
7ad0a0cb
MD
232}
233
d56db448
DG
234/*
235 * Return a channel object for the given key.
236 *
237 * RCU read side lock MUST be acquired before calling this function and
238 * protects the channel ptr.
239 */
d88aee68 240struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 241{
e4421fec 242 struct lttng_ht_iter iter;
d88aee68 243 struct lttng_ht_node_u64 *node;
cd9adb8b 244 struct lttng_consumer_channel *channel = nullptr;
3bd1e081 245
48b7cdc2
FD
246 ASSERT_RCU_READ_LOCKED();
247
d88aee68
DG
248 /* -1ULL keys are lookup failures */
249 if (key == (uint64_t) -1ULL) {
cd9adb8b 250 return nullptr;
7a57cf92 251 }
e4421fec 252
fa29bfbf 253 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
d88aee68 254 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 255 if (node != nullptr) {
0114db0e 256 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
3bd1e081 257 }
e4421fec
DG
258
259 return channel;
3bd1e081
MD
260}
261
b5a6470f
DG
262/*
263 * There is a possibility that the consumer does not have enough time between
264 * the close of the channel on the session daemon and the cleanup in here thus
265 * once we have a channel add with an existing key, we know for sure that this
266 * channel will eventually get cleaned up by all streams being closed.
267 *
268 * This function just nullifies the already existing channel key.
269 */
270static void steal_channel_key(uint64_t key)
271{
272 struct lttng_consumer_channel *channel;
273
56047f5a 274 lttng::urcu::read_lock_guard read_lock;
b5a6470f
DG
275 channel = consumer_find_channel(key);
276 if (channel) {
277 channel->key = (uint64_t) -1ULL;
278 /*
279 * We don't want the lookup to match, but we still need to iterate on
280 * this channel when iterating over the hash table. Just change the
281 * node key.
282 */
283 channel->node.key = (uint64_t) -1ULL;
284 }
b5a6470f
DG
285}
286
ffe60014 287static void free_channel_rcu(struct rcu_head *head)
702b1ea4 288{
28ab034a 289 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
ffe60014 290 struct lttng_consumer_channel *channel =
0114db0e 291 lttng::utils::container_of(node, &lttng_consumer_channel::node);
702b1ea4 292
fa29bfbf 293 switch (the_consumer_data.type) {
b83e03c4
MD
294 case LTTNG_CONSUMER_KERNEL:
295 break;
296 case LTTNG_CONSUMER32_UST:
297 case LTTNG_CONSUMER64_UST:
298 lttng_ustconsumer_free_channel(channel);
299 break;
300 default:
301 ERR("Unknown consumer_data type");
302 abort();
303 }
ffe60014 304 free(channel);
702b1ea4
MD
305}
306
00e2e675
DG
307/*
308 * RCU protected relayd socket pair free.
309 */
ffe60014 310static void free_relayd_rcu(struct rcu_head *head)
00e2e675 311{
28ab034a 312 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
00e2e675 313 struct consumer_relayd_sock_pair *relayd =
0114db0e 314 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675 315
8994307f
DG
316 /*
317 * Close all sockets. This is done in the call RCU since we don't want the
318 * socket fds to be reassigned thus potentially creating bad state of the
319 * relayd object.
320 *
321 * We do not have to lock the control socket mutex here since at this stage
322 * there is no one referencing to this relayd object.
323 */
324 (void) relayd_close(&relayd->control_sock);
325 (void) relayd_close(&relayd->data_sock);
326
3a84e2f3 327 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
328 free(relayd);
329}
330
331/*
332 * Destroy and free relayd socket pair object.
00e2e675 333 */
51230d70 334void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
335{
336 int ret;
337 struct lttng_ht_iter iter;
338
cd9adb8b 339 if (relayd == nullptr) {
173af62f
DG
340 return;
341 }
342
00e2e675
DG
343 DBG("Consumer destroy and close relayd socket pair");
344
345 iter.iter.node = &relayd->node.node;
fa29bfbf 346 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
173af62f 347 if (ret != 0) {
8994307f 348 /* We assume the relayd is being or is destroyed */
173af62f
DG
349 return;
350 }
00e2e675 351
00e2e675 352 /* RCU free() call */
ffe60014
DG
353 call_rcu(&relayd->node.head, free_relayd_rcu);
354}
355
356/*
357 * Remove a channel from the global list protected by a mutex. This function is
358 * also responsible for freeing its data structures.
359 */
360void consumer_del_channel(struct lttng_consumer_channel *channel)
361{
ffe60014
DG
362 struct lttng_ht_iter iter;
363
d88aee68 364 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014 365
fa29bfbf 366 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 367 pthread_mutex_lock(&channel->lock);
ffe60014 368
212d67a2
DG
369 /* Destroy streams that might have been left in the stream list. */
370 clean_channel_stream_list(channel);
51e762e5 371
d3e2ba59
JD
372 if (channel->live_timer_enabled == 1) {
373 consumer_timer_live_stop(channel);
374 }
e9404c27
JG
375 if (channel->monitor_timer_enabled == 1) {
376 consumer_timer_monitor_stop(channel);
377 }
d3e2ba59 378
319dcddc
JG
379 /*
380 * Send a last buffer statistics sample to the session daemon
381 * to ensure it tracks the amount of data consumed by this channel.
382 */
383 sample_and_send_channel_buffer_stats(channel);
384
fa29bfbf 385 switch (the_consumer_data.type) {
ffe60014
DG
386 case LTTNG_CONSUMER_KERNEL:
387 break;
388 case LTTNG_CONSUMER32_UST:
389 case LTTNG_CONSUMER64_UST:
390 lttng_ustconsumer_del_channel(channel);
391 break;
392 default:
393 ERR("Unknown consumer_data type");
a0377dfe 394 abort();
ffe60014
DG
395 goto end;
396 }
397
d2956687 398 lttng_trace_chunk_put(channel->trace_chunk);
cd9adb8b 399 channel->trace_chunk = nullptr;
5c3892a6 400
d2956687
JG
401 if (channel->is_published) {
402 int ret;
403
56047f5a 404 lttng::urcu::read_lock_guard read_lock;
d2956687 405 iter.iter.node = &channel->node.node;
fa29bfbf 406 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
a0377dfe 407 LTTNG_ASSERT(!ret);
ffe60014 408
d2956687 409 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
28ab034a 410 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
a0377dfe 411 LTTNG_ASSERT(!ret);
d2956687
JG
412 }
413
b6921a17
JG
414 channel->is_deleted = true;
415 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 416end:
a9838785 417 pthread_mutex_unlock(&channel->lock);
fa29bfbf 418 pthread_mutex_unlock(&the_consumer_data.lock);
00e2e675
DG
419}
420
228b5bf7
DG
421/*
422 * Iterate over the relayd hash table and destroy each element. Finally,
423 * destroy the whole hash table.
424 */
cd9adb8b 425static void cleanup_relayd_ht()
228b5bf7
DG
426{
427 struct lttng_ht_iter iter;
428 struct consumer_relayd_sock_pair *relayd;
429
56047f5a
JG
430 {
431 lttng::urcu::read_lock_guard read_lock;
228b5bf7 432
56047f5a
JG
433 cds_lfht_for_each_entry (
434 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
435 consumer_destroy_relayd(relayd);
436 }
228b5bf7
DG
437 }
438
fa29bfbf 439 lttng_ht_destroy(the_consumer_data.relayd_ht);
228b5bf7
DG
440}
441
8994307f
DG
442/*
443 * Update the end point status of all streams having the given network sequence
444 * index (relayd index).
445 *
446 * It's atomically set without having the stream mutex locked which is fine
447 * because we handle the write/read race with a pipe wakeup for each thread.
448 */
da009f2c 449static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
28ab034a 450 enum consumer_endpoint_status status)
8994307f
DG
451{
452 struct lttng_ht_iter iter;
453 struct lttng_consumer_stream *stream;
454
da009f2c 455 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f 456
56047f5a 457 lttng::urcu::read_lock_guard read_lock;
8994307f
DG
458
459 /* Let's begin with metadata */
28ab034a 460 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
461 if (stream->net_seq_idx == net_seq_idx) {
462 uatomic_set(&stream->endpoint_status, status);
463 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
464 }
465 }
466
467 /* Follow up by the data streams */
28ab034a 468 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
469 if (stream->net_seq_idx == net_seq_idx) {
470 uatomic_set(&stream->endpoint_status, status);
471 DBG("Delete flag set to data stream %d", stream->wait_fd);
472 }
473 }
8994307f
DG
474}
475
476/*
477 * Cleanup a relayd object by flagging every associated streams for deletion,
478 * destroying the object meaning removing it from the relayd hash table,
479 * closing the sockets and freeing the memory in a RCU call.
480 *
481 * If a local data context is available, notify the threads that the streams'
482 * state have changed.
483 */
9276e5c8 484void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 485{
da009f2c 486 uint64_t netidx;
8994307f 487
a0377dfe 488 LTTNG_ASSERT(relayd);
8994307f 489
97535efa 490 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
9617607b 491
8994307f
DG
492 /* Save the net sequence index before destroying the object */
493 netidx = relayd->net_seq_idx;
494
495 /*
496 * Delete the relayd from the relayd hash table, close the sockets and free
497 * the object in a RCU call.
498 */
51230d70 499 consumer_destroy_relayd(relayd);
8994307f
DG
500
501 /* Set inactive endpoint to all streams */
502 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
503
504 /*
505 * With a local data context, notify the threads that the streams' state
506 * have changed. The write() action on the pipe acts as an "implicit"
507 * memory barrier ordering the updates of the end point status from the
508 * read of this status which happens AFTER receiving this notify.
509 */
9276e5c8
JR
510 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
511 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
512}
513
a6ba4fe1
DG
514/*
515 * Flag a relayd socket pair for destruction. Destroy it if the refcount
516 * reaches zero.
517 *
518 * RCU read side lock MUST be aquired before calling this function.
519 */
520void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
521{
a0377dfe 522 LTTNG_ASSERT(relayd);
48b7cdc2 523 ASSERT_RCU_READ_LOCKED();
a6ba4fe1
DG
524
525 /* Set destroy flag for this object */
526 uatomic_set(&relayd->destroy_flag, 1);
527
528 /* Destroy the relayd if refcount is 0 */
529 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 530 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
531 }
532}
533
3bd1e081 534/*
1d1a276c
DG
535 * Completly destroy stream from every visiable data structure and the given
536 * hash table if one.
537 *
538 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 539 */
28ab034a 540void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
3bd1e081 541{
1d1a276c 542 consumer_stream_destroy(stream, ht);
3bd1e081
MD
543}
544
5ab66908
MD
545/*
546 * XXX naming of del vs destroy is all mixed up.
547 */
548void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
549{
550 consumer_stream_destroy(stream, data_ht);
551}
552
553void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
554{
555 consumer_stream_destroy(stream, metadata_ht);
556}
557
28ab034a
JG
558void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
559 struct lttng_consumer_channel *channel)
d9a2e16e 560{
28ab034a 561 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
d9a2e16e
JD
562}
563
3bd1e081
MD
564/*
565 * Add a stream to the global list protected by a mutex.
566 */
66d583dc 567void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 568{
5ab66908 569 struct lttng_ht *ht = data_ht;
3bd1e081 570
a0377dfe
FD
571 LTTNG_ASSERT(stream);
572 LTTNG_ASSERT(ht);
c77fc10a 573
d88aee68 574 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5 575
fa29bfbf 576 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 577 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 578 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 579 pthread_mutex_lock(&stream->lock);
56047f5a 580 lttng::urcu::read_lock_guard read_lock;
e316aad5 581
43c34bc3 582 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 583 steal_stream_key(stream->key, ht);
43c34bc3 584
d88aee68 585 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 586
28ab034a 587 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 588
ca22feea
DG
589 /*
590 * Add stream to the stream_list_ht of the consumer data. No need to steal
591 * the key since the HT does not use it and we allow to add redundant keys
592 * into this table.
593 */
28ab034a 594 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 595
e316aad5 596 /*
ffe60014
DG
597 * When nb_init_stream_left reaches 0, we don't need to trigger any action
598 * in terms of destroying the associated channel, because the action that
e316aad5
DG
599 * causes the count to become 0 also causes a stream to be added. The
600 * channel deletion will thus be triggered by the following removal of this
601 * stream.
602 */
ffe60014 603 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
604 /* Increment refcount before decrementing nb_init_stream_left */
605 cmm_smp_wmb();
ffe60014 606 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
607 }
608
609 /* Update consumer data once the node is inserted. */
fa29bfbf
SM
610 the_consumer_data.stream_count++;
611 the_consumer_data.need_update = 1;
3bd1e081 612
2e818a6a 613 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 614 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 615 pthread_mutex_unlock(&stream->chan->lock);
fa29bfbf 616 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
617}
618
00e2e675 619/*
3f8e211f
DG
620 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
621 * be acquired before calling this.
00e2e675 622 */
d09e1200 623static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
624{
625 int ret = 0;
d88aee68 626 struct lttng_ht_node_u64 *node;
00e2e675
DG
627 struct lttng_ht_iter iter;
628
a0377dfe 629 LTTNG_ASSERT(relayd);
48b7cdc2 630 ASSERT_RCU_READ_LOCKED();
00e2e675 631
28ab034a 632 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
d88aee68 633 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 634 if (node != nullptr) {
00e2e675
DG
635 goto end;
636 }
fa29bfbf 637 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
00e2e675 638
00e2e675
DG
639end:
640 return ret;
641}
642
643/*
644 * Allocate and return a consumer relayd socket.
645 */
28ab034a 646static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
00e2e675 647{
cd9adb8b 648 struct consumer_relayd_sock_pair *obj = nullptr;
00e2e675 649
da009f2c
MD
650 /* net sequence index of -1 is a failure */
651 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
652 goto error;
653 }
654
64803277 655 obj = zmalloc<consumer_relayd_sock_pair>();
cd9adb8b 656 if (obj == nullptr) {
00e2e675
DG
657 PERROR("zmalloc relayd sock");
658 goto error;
659 }
660
661 obj->net_seq_idx = net_seq_idx;
662 obj->refcount = 0;
173af62f 663 obj->destroy_flag = 0;
f96e4545
MD
664 obj->control_sock.sock.fd = -1;
665 obj->data_sock.sock.fd = -1;
d88aee68 666 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
cd9adb8b 667 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
00e2e675
DG
668
669error:
670 return obj;
671}
672
673/*
674 * Find a relayd socket pair in the global consumer data.
675 *
676 * Return the object if found else NULL.
b0b335c8
MD
677 * RCU read-side lock must be held across this call and while using the
678 * returned object.
00e2e675 679 */
d88aee68 680struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
681{
682 struct lttng_ht_iter iter;
d88aee68 683 struct lttng_ht_node_u64 *node;
cd9adb8b 684 struct consumer_relayd_sock_pair *relayd = nullptr;
00e2e675 685
48b7cdc2
FD
686 ASSERT_RCU_READ_LOCKED();
687
00e2e675 688 /* Negative keys are lookup failures */
d88aee68 689 if (key == (uint64_t) -1ULL) {
00e2e675
DG
690 goto error;
691 }
692
fa29bfbf 693 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
d88aee68 694 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 695 if (node != nullptr) {
0114db0e 696 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675
DG
697 }
698
00e2e675
DG
699error:
700 return relayd;
701}
702
10a50311
JD
703/*
704 * Find a relayd and send the stream
705 *
706 * Returns 0 on success, < 0 on error
707 */
28ab034a 708int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
10a50311
JD
709{
710 int ret = 0;
711 struct consumer_relayd_sock_pair *relayd;
712
a0377dfe
FD
713 LTTNG_ASSERT(stream);
714 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
715 LTTNG_ASSERT(path);
10a50311
JD
716
717 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 718 lttng::urcu::read_lock_guard read_lock;
10a50311 719 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 720 if (relayd != nullptr) {
10a50311
JD
721 /* Add stream on the relayd */
722 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
723 ret = relayd_add_stream(&relayd->control_sock,
724 stream->name,
725 get_consumer_domain(),
726 path,
727 &stream->relayd_stream_id,
728 stream->chan->tracefile_size,
729 stream->chan->tracefile_count,
730 stream->trace_chunk);
10a50311
JD
731 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
732 if (ret < 0) {
28ab034a
JG
733 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
734 relayd->net_seq_idx);
9276e5c8 735 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
736 goto end;
737 }
1c20f0e2 738
10a50311 739 uatomic_inc(&relayd->refcount);
d01178b6 740 stream->sent_to_relayd = 1;
10a50311
JD
741 } else {
742 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
28ab034a
JG
743 stream->key,
744 stream->net_seq_idx);
10a50311
JD
745 ret = -1;
746 goto end;
747 }
748
749 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
28ab034a
JG
750 stream->name,
751 stream->key,
752 stream->net_seq_idx);
10a50311
JD
753
754end:
10a50311
JD
755 return ret;
756}
757
a4baae1b
JD
758/*
759 * Find a relayd and send the streams sent message
760 *
761 * Returns 0 on success, < 0 on error
762 */
763int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
764{
765 int ret = 0;
766 struct consumer_relayd_sock_pair *relayd;
767
a0377dfe 768 LTTNG_ASSERT(net_seq_idx != -1ULL);
a4baae1b
JD
769
770 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 771 lttng::urcu::read_lock_guard read_lock;
a4baae1b 772 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 773 if (relayd != nullptr) {
a4baae1b
JD
774 /* Add stream on the relayd */
775 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
776 ret = relayd_streams_sent(&relayd->control_sock);
777 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
778 if (ret < 0) {
28ab034a
JG
779 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
780 relayd->net_seq_idx);
9276e5c8 781 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
782 goto end;
783 }
784 } else {
28ab034a 785 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
a4baae1b
JD
786 ret = -1;
787 goto end;
788 }
789
790 ret = 0;
791 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
792
793end:
a4baae1b
JD
794 return ret;
795}
796
10a50311
JD
797/*
798 * Find a relayd and close the stream
799 */
800void close_relayd_stream(struct lttng_consumer_stream *stream)
801{
802 struct consumer_relayd_sock_pair *relayd;
803
804 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 805 lttng::urcu::read_lock_guard read_lock;
10a50311
JD
806 relayd = consumer_find_relayd(stream->net_seq_idx);
807 if (relayd) {
808 consumer_stream_relayd_close(stream, relayd);
809 }
10a50311
JD
810}
811
00e2e675
DG
812/*
813 * Handle stream for relayd transmission if the stream applies for network
814 * streaming where the net sequence index is set.
815 *
816 * Return destination file descriptor or negative value on error.
817 */
6197aea7 818static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
28ab034a
JG
819 size_t data_size,
820 unsigned long padding,
821 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
822{
823 int outfd = -1, ret;
00e2e675
DG
824 struct lttcomm_relayd_data_hdr data_hdr;
825
826 /* Safety net */
a0377dfe
FD
827 LTTNG_ASSERT(stream);
828 LTTNG_ASSERT(relayd);
00e2e675
DG
829
830 /* Reset data header */
831 memset(&data_hdr, 0, sizeof(data_hdr));
832
00e2e675
DG
833 if (stream->metadata_flag) {
834 /* Caller MUST acquire the relayd control socket lock */
835 ret = relayd_send_metadata(&relayd->control_sock, data_size);
836 if (ret < 0) {
837 goto error;
838 }
839
840 /* Metadata are always sent on the control socket. */
6151a90f 841 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
842 } else {
843 /* Set header with stream information */
844 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
845 data_hdr.data_size = htobe32(data_size);
1d4dfdef 846 data_hdr.padding_size = htobe32(padding);
c35f9726 847
39df6d9f
DG
848 /*
849 * Note that net_seq_num below is assigned with the *current* value of
850 * next_net_seq_num and only after that the next_net_seq_num will be
851 * increment. This is why when issuing a command on the relayd using
852 * this next value, 1 should always be substracted in order to compare
853 * the last seen sequence number on the relayd side to the last sent.
854 */
3604f373 855 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
856 /* Other fields are zeroed previously */
857
28ab034a 858 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
00e2e675
DG
859 if (ret < 0) {
860 goto error;
861 }
862
3604f373
DG
863 ++stream->next_net_seq_num;
864
00e2e675 865 /* Set to go on data socket */
6151a90f 866 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
867 }
868
869error:
870 return outfd;
871}
872
b1316da1
JG
873/*
874 * Write a character on the metadata poll pipe to wake the metadata thread.
875 * Returns 0 on success, -1 on error.
876 */
877int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
878{
879 int ret = 0;
880
28ab034a 881 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
b1316da1
JG
882 if (channel->monitor && channel->metadata_stream) {
883 const char dummy = 'c';
28ab034a
JG
884 const ssize_t write_ret =
885 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
b1316da1
JG
886
887 if (write_ret < 1) {
888 if (errno == EWOULDBLOCK) {
889 /*
890 * This is fine, the metadata poll thread
891 * is having a hard time keeping-up, but
892 * it will eventually wake-up and consume
893 * the available data.
894 */
895 ret = 0;
896 } else {
897 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
898 ret = -1;
899 goto end;
900 }
901 }
902 }
903
904end:
905 return ret;
906}
907
d2956687
JG
908/*
909 * Trigger a dump of the metadata content. Following/during the succesful
910 * completion of this call, the metadata poll thread will start receiving
911 * metadata packets to consume.
912 *
913 * The caller must hold the channel and stream locks.
914 */
28ab034a 915static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
d2956687
JG
916{
917 int ret;
918
919 ASSERT_LOCKED(stream->chan->lock);
920 ASSERT_LOCKED(stream->lock);
a0377dfe
FD
921 LTTNG_ASSERT(stream->metadata_flag);
922 LTTNG_ASSERT(stream->chan->trace_chunk);
d2956687 923
fa29bfbf 924 switch (the_consumer_data.type) {
d2956687
JG
925 case LTTNG_CONSUMER_KERNEL:
926 /*
927 * Reset the position of what has been read from the
928 * metadata cache to 0 so we can dump it again.
929 */
930 ret = kernctl_metadata_cache_dump(stream->wait_fd);
931 break;
932 case LTTNG_CONSUMER32_UST:
933 case LTTNG_CONSUMER64_UST:
934 /*
935 * Reset the position pushed from the metadata cache so it
936 * will write from the beginning on the next push.
937 */
938 stream->ust_metadata_pushed = 0;
939 ret = consumer_metadata_wakeup_pipe(stream->chan);
940 break;
941 default:
942 ERR("Unknown consumer_data type");
943 abort();
944 }
945 if (ret < 0) {
946 ERR("Failed to dump the metadata cache");
947 }
948 return ret;
949}
950
28ab034a
JG
951static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
952 struct lttng_trace_chunk *new_trace_chunk)
d2956687 953{
d2956687 954 pthread_mutex_lock(&channel->lock);
b6921a17
JG
955 if (channel->is_deleted) {
956 /*
957 * The channel has been logically deleted and should no longer
958 * be used. It has released its reference to its current trace
959 * chunk and should not acquire a new one.
960 *
961 * Return success as there is nothing for the caller to do.
962 */
963 goto end;
964 }
d2956687
JG
965
966 /*
967 * The acquisition of the reference cannot fail (barring
968 * a severe internal error) since a reference to the published
969 * chunk is already held by the caller.
970 */
971 if (new_trace_chunk) {
28ab034a 972 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
d2956687 973
a0377dfe 974 LTTNG_ASSERT(acquired_reference);
d2956687
JG
975 }
976
977 lttng_trace_chunk_put(channel->trace_chunk);
978 channel->trace_chunk = new_trace_chunk;
d2956687
JG
979end:
980 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 981 return 0;
d2956687
JG
982}
983
3bd1e081 984/*
ffe60014
DG
985 * Allocate and return a new lttng_consumer_channel object using the given key
986 * to initialize the hash table node.
987 *
988 * On error, return NULL.
3bd1e081 989 */
886224ff 990struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
28ab034a
JG
991 uint64_t session_id,
992 const uint64_t *chunk_id,
993 const char *pathname,
994 const char *name,
995 uint64_t relayd_id,
996 enum lttng_event_output output,
997 uint64_t tracefile_size,
998 uint64_t tracefile_count,
999 uint64_t session_id_per_pid,
1000 unsigned int monitor,
1001 unsigned int live_timer_interval,
1002 bool is_in_live_session,
1003 const char *root_shm_path,
1004 const char *shm_path)
3bd1e081 1005{
cd9adb8b
JG
1006 struct lttng_consumer_channel *channel = nullptr;
1007 struct lttng_trace_chunk *trace_chunk = nullptr;
d2956687
JG
1008
1009 if (chunk_id) {
1010 trace_chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 1011 the_consumer_data.chunk_registry, session_id, *chunk_id);
d2956687
JG
1012 if (!trace_chunk) {
1013 ERR("Failed to find trace chunk reference during creation of channel");
1014 goto end;
1015 }
1016 }
3bd1e081 1017
64803277 1018 channel = zmalloc<lttng_consumer_channel>();
cd9adb8b 1019 if (channel == nullptr) {
7a57cf92 1020 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
1021 goto end;
1022 }
ffe60014
DG
1023
1024 channel->key = key;
3bd1e081 1025 channel->refcount = 0;
ffe60014 1026 channel->session_id = session_id;
1950109e 1027 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1028 channel->relayd_id = relayd_id;
1624d5b7
JD
1029 channel->tracefile_size = tracefile_size;
1030 channel->tracefile_count = tracefile_count;
2bba9e53 1031 channel->monitor = monitor;
ecc48a90 1032 channel->live_timer_interval = live_timer_interval;
a2814ea7 1033 channel->is_live = is_in_live_session;
cd9adb8b
JG
1034 pthread_mutex_init(&channel->lock, nullptr);
1035 pthread_mutex_init(&channel->timer_lock, nullptr);
ffe60014 1036
0c759fc9
DG
1037 switch (output) {
1038 case LTTNG_EVENT_SPLICE:
1039 channel->output = CONSUMER_CHANNEL_SPLICE;
1040 break;
1041 case LTTNG_EVENT_MMAP:
1042 channel->output = CONSUMER_CHANNEL_MMAP;
1043 break;
1044 default:
a0377dfe 1045 abort();
0c759fc9 1046 free(channel);
cd9adb8b 1047 channel = nullptr;
0c759fc9
DG
1048 goto end;
1049 }
1050
07b86b52
JD
1051 /*
1052 * In monitor mode, the streams associated with the channel will be put in
1053 * a special list ONLY owned by this channel. So, the refcount is set to 1
1054 * here meaning that the channel itself has streams that are referenced.
1055 *
1056 * On a channel deletion, once the channel is no longer visible, the
1057 * refcount is decremented and checked for a zero value to delete it. With
1058 * streams in no monitor mode, it will now be safe to destroy the channel.
1059 */
1060 if (!channel->monitor) {
1061 channel->refcount = 1;
1062 }
1063
ffe60014
DG
1064 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1065 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1066
1067 strncpy(channel->name, name, sizeof(channel->name));
1068 channel->name[sizeof(channel->name) - 1] = '\0';
1069
3d071855
MD
1070 if (root_shm_path) {
1071 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1072 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1073 }
d7ba1388
MD
1074 if (shm_path) {
1075 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1076 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1077 }
1078
d88aee68 1079 lttng_ht_node_init_u64(&channel->node, channel->key);
28ab034a 1080 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
d8ef542d
MD
1081
1082 channel->wait_fd = -1;
ffe60014
DG
1083 CDS_INIT_LIST_HEAD(&channel->streams.head);
1084
d2956687 1085 if (trace_chunk) {
28ab034a 1086 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
d2956687
JG
1087 if (ret) {
1088 goto error;
1089 }
1090 }
1091
62a7b8ed 1092 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1093
3bd1e081 1094end:
d2956687 1095 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1096 return channel;
d2956687
JG
1097error:
1098 consumer_del_channel(channel);
cd9adb8b 1099 channel = nullptr;
d2956687 1100 goto end;
3bd1e081
MD
1101}
1102
1103/*
1104 * Add a channel to the global list protected by a mutex.
821fffb2 1105 *
b5a6470f 1106 * Always return 0 indicating success.
3bd1e081 1107 */
d8ef542d 1108int consumer_add_channel(struct lttng_consumer_channel *channel,
28ab034a 1109 struct lttng_consumer_local_data *ctx)
3bd1e081 1110{
fa29bfbf 1111 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 1112 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1113 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1114
b5a6470f
DG
1115 /*
1116 * This gives us a guarantee that the channel we are about to add to the
1117 * channel hash table will be unique. See this function comment on the why
1118 * we need to steel the channel key at this stage.
1119 */
1120 steal_channel_key(channel->key);
c77fc10a 1121
56047f5a 1122 lttng::urcu::read_lock_guard read_lock;
fa29bfbf
SM
1123 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1124 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
28ab034a 1125 &channel->channels_by_session_id_ht_node);
d2956687 1126 channel->is_published = true;
b5a6470f 1127
ec6ea7d0 1128 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1129 pthread_mutex_unlock(&channel->lock);
fa29bfbf 1130 pthread_mutex_unlock(&the_consumer_data.lock);
702b1ea4 1131
b5a6470f 1132 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1133 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1134 }
b5a6470f
DG
1135
1136 return 0;
3bd1e081
MD
1137}
1138
1139/*
1140 * Allocate the pollfd structure and the local view of the out fds to avoid
1141 * doing a lookup in the linked list and concurrency issues when writing is
1142 * needed. Called with consumer_data.lock held.
1143 *
1144 * Returns the number of fds in the structures.
1145 */
ffe60014 1146static int update_poll_array(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1147 struct pollfd **pollfd,
1148 struct lttng_consumer_stream **local_stream,
1149 struct lttng_ht *ht,
1150 int *nb_inactive_fd)
3bd1e081 1151{
3bd1e081 1152 int i = 0;
e4421fec
DG
1153 struct lttng_ht_iter iter;
1154 struct lttng_consumer_stream *stream;
3bd1e081 1155
a0377dfe
FD
1156 LTTNG_ASSERT(ctx);
1157 LTTNG_ASSERT(ht);
1158 LTTNG_ASSERT(pollfd);
1159 LTTNG_ASSERT(local_stream);
ffe60014 1160
3bd1e081 1161 DBG("Updating poll fd array");
9a2fcf78 1162 *nb_inactive_fd = 0;
56047f5a
JG
1163
1164 {
1165 lttng::urcu::read_lock_guard read_lock;
1166 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1167 /*
1168 * Only active streams with an active end point can be added to the
1169 * poll set and local stream storage of the thread.
1170 *
1171 * There is a potential race here for endpoint_status to be updated
1172 * just after the check. However, this is OK since the stream(s) will
1173 * be deleted once the thread is notified that the end point state has
1174 * changed where this function will be called back again.
1175 *
1176 * We track the number of inactive FDs because they still need to be
1177 * closed by the polling thread after a wakeup on the data_pipe or
1178 * metadata_pipe.
1179 */
1180 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1181 (*nb_inactive_fd)++;
1182 continue;
1183 }
1184
1185 (*pollfd)[i].fd = stream->wait_fd;
1186 (*pollfd)[i].events = POLLIN | POLLPRI;
1187 local_stream[i] = stream;
1188 i++;
3bd1e081 1189 }
3bd1e081
MD
1190 }
1191
1192 /*
50f8ae69 1193 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1194 * increment i so nb_fd is the number of real FD.
1195 */
acdb9057 1196 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1197 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1198
1199 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1200 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1201 return i;
1202}
1203
1204/*
84382d49
MD
1205 * Poll on the should_quit pipe and the command socket return -1 on
1206 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1207 */
1208int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1209{
1210 int num_rdy;
1211
88f2b785 1212restart:
3bd1e081
MD
1213 num_rdy = poll(consumer_sockpoll, 2, -1);
1214 if (num_rdy == -1) {
88f2b785
MD
1215 /*
1216 * Restart interrupted system call.
1217 */
1218 if (errno == EINTR) {
1219 goto restart;
1220 }
7a57cf92 1221 PERROR("Poll error");
84382d49 1222 return -1;
3bd1e081 1223 }
509bb1cf 1224 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1225 DBG("consumer_should_quit wake up");
84382d49 1226 return 1;
3bd1e081
MD
1227 }
1228 return 0;
3bd1e081
MD
1229}
1230
1231/*
1232 * Set the error socket.
1233 */
28ab034a 1234void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
3bd1e081
MD
1235{
1236 ctx->consumer_error_socket = sock;
1237}
1238
1239/*
1240 * Set the command socket path.
1241 */
28ab034a 1242void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
3bd1e081
MD
1243{
1244 ctx->consumer_command_sock_path = sock;
1245}
1246
1247/*
1248 * Send return code to the session daemon.
1249 * If the socket is not defined, we return 0, it is not a fatal error
1250 */
ffe60014 1251int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1252{
1253 if (ctx->consumer_error_socket > 0) {
28ab034a
JG
1254 return lttcomm_send_unix_sock(
1255 ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
3bd1e081
MD
1256 }
1257
1258 return 0;
1259}
1260
1261/*
228b5bf7
DG
1262 * Close all the tracefiles and stream fds and MUST be called when all
1263 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081 1264 */
cd9adb8b 1265void lttng_consumer_cleanup()
3bd1e081 1266{
e4421fec 1267 struct lttng_ht_iter iter;
ffe60014 1268 struct lttng_consumer_channel *channel;
e10aec8f 1269 unsigned int trace_chunks_left;
6065ceec 1270
56047f5a
JG
1271 {
1272 lttng::urcu::read_lock_guard read_lock;
3bd1e081 1273
56047f5a
JG
1274 cds_lfht_for_each_entry (
1275 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1276 consumer_del_channel(channel);
1277 }
3bd1e081 1278 }
6065ceec 1279
fa29bfbf
SM
1280 lttng_ht_destroy(the_consumer_data.channel_ht);
1281 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1282
1283 cleanup_relayd_ht();
1284
fa29bfbf 1285 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
d8ef542d 1286
228b5bf7
DG
1287 /*
1288 * This HT contains streams that are freed by either the metadata thread or
1289 * the data thread so we do *nothing* on the hash table and simply destroy
1290 * it.
1291 */
fa29bfbf 1292 lttng_ht_destroy(the_consumer_data.stream_list_ht);
28cc88f3 1293
e10aec8f
MD
1294 /*
1295 * Trace chunks in the registry may still exist if the session
1296 * daemon has encountered an internal error and could not
1297 * tear down its sessions and/or trace chunks properly.
1298 *
1299 * Release the session daemon's implicit reference to any remaining
1300 * trace chunk and print an error if any trace chunk was found. Note
1301 * that there are _no_ legitimate cases for trace chunks to be left,
1302 * it is a leak. However, it can happen following a crash of the
1303 * session daemon and not emptying the registry would cause an assertion
1304 * to hit.
1305 */
28ab034a
JG
1306 trace_chunks_left =
1307 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
e10aec8f
MD
1308 if (trace_chunks_left) {
1309 ERR("%u trace chunks are leaked by lttng-consumerd. "
28ab034a
JG
1310 "This can be caused by an internal error of the session daemon.",
1311 trace_chunks_left);
e10aec8f
MD
1312 }
1313 /* Run all callbacks freeing each chunk. */
1314 rcu_barrier();
fa29bfbf 1315 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
3bd1e081
MD
1316}
1317
1318/*
1319 * Called from signal handler.
1320 */
1321void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1322{
6cd525e8
MD
1323 ssize_t ret;
1324
10211f5c 1325 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1326 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1327 if (ret < 1) {
7a57cf92 1328 PERROR("write consumer quit");
3bd1e081 1329 }
ab1027f4
DG
1330
1331 DBG("Consumer flag that it should quit");
3bd1e081
MD
1332}
1333
5199ffc4
JG
1334/*
1335 * Flush pending writes to trace output disk file.
1336 */
28ab034a 1337static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
3bd1e081 1338{
c7a78aab 1339 int ret;
3bd1e081
MD
1340 int outfd = stream->out_fd;
1341
1342 /*
1343 * This does a blocking write-and-wait on any page that belongs to the
1344 * subbuffer prior to the one we just wrote.
1345 * Don't care about error values, as these are just hints and ways to
1346 * limit the amount of page cache used.
1347 */
ffe60014 1348 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1349 return;
1350 }
28ab034a
JG
1351 lttng_sync_file_range(outfd,
1352 orig_offset - stream->max_sb_size,
1353 stream->max_sb_size,
1354 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
1355 SYNC_FILE_RANGE_WAIT_AFTER);
3bd1e081
MD
1356 /*
1357 * Give hints to the kernel about how we access the file:
1358 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1359 * we write it.
1360 *
1361 * We need to call fadvise again after the file grows because the
1362 * kernel does not seem to apply fadvise to non-existing parts of the
1363 * file.
1364 *
1365 * Call fadvise _after_ having waited for the page writeback to
1366 * complete because the dirty page writeback semantic is not well
1367 * defined. So it can be expected to lead to lower throughput in
1368 * streaming.
1369 */
28ab034a
JG
1370 ret = posix_fadvise(
1371 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
a0d0e127 1372 if (ret && ret != -ENOSYS) {
a74a5f4a
JG
1373 errno = ret;
1374 PERROR("posix_fadvise on fd %i", outfd);
c7a78aab 1375 }
3bd1e081
MD
1376}
1377
1378/*
1379 * Initialise the necessary environnement :
1380 * - create a new context
1381 * - create the poll_pipe
1382 * - create the should_quit pipe (for signal handler)
1383 * - create the thread pipe (for splice)
1384 *
1385 * Takes a function pointer as argument, this function is called when data is
1386 * available on a buffer. This function is responsible to do the
1387 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1388 * buffer configuration and then kernctl_put_next_subbuf at the end.
1389 *
1390 * Returns a pointer to the new context or NULL on error.
1391 */
28ab034a
JG
1392struct lttng_consumer_local_data *
1393lttng_consumer_create(enum lttng_consumer_type type,
1394 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1395 struct lttng_consumer_local_data *ctx,
1396 bool locked_by_caller),
1397 int (*recv_channel)(struct lttng_consumer_channel *channel),
1398 int (*recv_stream)(struct lttng_consumer_stream *stream),
1399 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1400{
d8ef542d 1401 int ret;
3bd1e081
MD
1402 struct lttng_consumer_local_data *ctx;
1403
a0377dfe 1404 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
28ab034a 1405 the_consumer_data.type == type);
fa29bfbf 1406 the_consumer_data.type = type;
3bd1e081 1407
64803277 1408 ctx = zmalloc<lttng_consumer_local_data>();
cd9adb8b 1409 if (ctx == nullptr) {
7a57cf92 1410 PERROR("allocating context");
3bd1e081
MD
1411 goto error;
1412 }
1413
1414 ctx->consumer_error_socket = -1;
331744e3 1415 ctx->consumer_metadata_socket = -1;
cd9adb8b 1416 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
3bd1e081
MD
1417 /* assign the callbacks */
1418 ctx->on_buffer_ready = buffer_ready;
1419 ctx->on_recv_channel = recv_channel;
1420 ctx->on_recv_stream = recv_stream;
1421 ctx->on_update_stream = update_stream;
1422
acdb9057
DG
1423 ctx->consumer_data_pipe = lttng_pipe_open(0);
1424 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1425 goto error_poll_pipe;
1426 }
1427
02b3d176
DG
1428 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1429 if (!ctx->consumer_wakeup_pipe) {
1430 goto error_wakeup_pipe;
1431 }
1432
3bd1e081
MD
1433 ret = pipe(ctx->consumer_should_quit);
1434 if (ret < 0) {
7a57cf92 1435 PERROR("Error creating recv pipe");
3bd1e081
MD
1436 goto error_quit_pipe;
1437 }
1438
d8ef542d
MD
1439 ret = pipe(ctx->consumer_channel_pipe);
1440 if (ret < 0) {
1441 PERROR("Error creating channel pipe");
1442 goto error_channel_pipe;
1443 }
1444
13886d2d
DG
1445 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1446 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1447 goto error_metadata_pipe;
1448 }
3bd1e081 1449
e9404c27
JG
1450 ctx->channel_monitor_pipe = -1;
1451
fb3a43a9 1452 return ctx;
3bd1e081 1453
fb3a43a9 1454error_metadata_pipe:
d8ef542d
MD
1455 utils_close_pipe(ctx->consumer_channel_pipe);
1456error_channel_pipe:
d8ef542d 1457 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1458error_quit_pipe:
02b3d176
DG
1459 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1460error_wakeup_pipe:
acdb9057 1461 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1462error_poll_pipe:
1463 free(ctx);
1464error:
cd9adb8b 1465 return nullptr;
3bd1e081
MD
1466}
1467
282dadbc
MD
1468/*
1469 * Iterate over all streams of the hashtable and free them properly.
1470 */
1471static void destroy_data_stream_ht(struct lttng_ht *ht)
1472{
1473 struct lttng_ht_iter iter;
1474 struct lttng_consumer_stream *stream;
1475
cd9adb8b 1476 if (ht == nullptr) {
282dadbc
MD
1477 return;
1478 }
1479
56047f5a
JG
1480 {
1481 lttng::urcu::read_lock_guard read_lock;
1482 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1483 /*
1484 * Ignore return value since we are currently cleaning up so any error
1485 * can't be handled.
1486 */
1487 (void) consumer_del_stream(stream, ht);
1488 }
282dadbc 1489 }
282dadbc
MD
1490
1491 lttng_ht_destroy(ht);
1492}
1493
1494/*
1495 * Iterate over all streams of the metadata hashtable and free them
1496 * properly.
1497 */
1498static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1499{
1500 struct lttng_ht_iter iter;
1501 struct lttng_consumer_stream *stream;
1502
cd9adb8b 1503 if (ht == nullptr) {
282dadbc
MD
1504 return;
1505 }
1506
56047f5a
JG
1507 {
1508 lttng::urcu::read_lock_guard read_lock;
1509 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1510 /*
1511 * Ignore return value since we are currently cleaning up so any error
1512 * can't be handled.
1513 */
1514 (void) consumer_del_metadata_stream(stream, ht);
1515 }
282dadbc 1516 }
282dadbc
MD
1517
1518 lttng_ht_destroy(ht);
1519}
1520
3bd1e081
MD
1521/*
1522 * Close all fds associated with the instance and free the context.
1523 */
1524void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1525{
4c462e79
MD
1526 int ret;
1527
ab1027f4
DG
1528 DBG("Consumer destroying it. Closing everything.");
1529
4f2e75b9
DG
1530 if (!ctx) {
1531 return;
1532 }
1533
282dadbc
MD
1534 destroy_data_stream_ht(data_ht);
1535 destroy_metadata_stream_ht(metadata_ht);
1536
4c462e79
MD
1537 ret = close(ctx->consumer_error_socket);
1538 if (ret) {
1539 PERROR("close");
1540 }
331744e3
JD
1541 ret = close(ctx->consumer_metadata_socket);
1542 if (ret) {
1543 PERROR("close");
1544 }
d8ef542d 1545 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1546 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1547 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1548 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1549 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1550
3bd1e081
MD
1551 unlink(ctx->consumer_command_sock_path);
1552 free(ctx);
1553}
1554
6197aea7
DG
1555/*
1556 * Write the metadata stream id on the specified file descriptor.
1557 */
28ab034a
JG
1558static int
1559write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
6197aea7 1560{
6cd525e8 1561 ssize_t ret;
1d4dfdef 1562 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1563
1d4dfdef
DG
1564 hdr.stream_id = htobe64(stream->relayd_stream_id);
1565 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1566 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1567 if (ret < sizeof(hdr)) {
d7b75ec8 1568 /*
6f04ed72 1569 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1570 * not to clubber the error output since this can happen in a normal
1571 * code path.
1572 */
1573 if (errno != EPIPE) {
1574 PERROR("write metadata stream id");
1575 }
1576 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1577 /*
1578 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1579 * handle writting the missing part so report that as an error and
1580 * don't lie to the caller.
1581 */
1582 ret = -1;
6197aea7
DG
1583 goto end;
1584 }
1d4dfdef 1585 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
28ab034a
JG
1586 stream->relayd_stream_id,
1587 padding);
6197aea7
DG
1588
1589end:
6cd525e8 1590 return (int) ret;
6197aea7
DG
1591}
1592
3bd1e081 1593/*
09e26845
DG
1594 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1595 * core function for writing trace buffers to either the local filesystem or
1596 * the network.
1597 *
d2956687 1598 * It must be called with the stream and the channel lock held.
79d4ffb7 1599 *
09e26845 1600 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1601 *
1602 * Returns the number of bytes written
1603 */
28ab034a
JG
1604ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1605 const struct lttng_buffer_view *buffer,
1606 unsigned long padding)
3bd1e081 1607{
994ab360 1608 ssize_t ret = 0;
f02e1e8a
DG
1609 off_t orig_offset = stream->out_fd_offset;
1610 /* Default is on the disk */
1611 int outfd = stream->out_fd;
cd9adb8b 1612 struct consumer_relayd_sock_pair *relayd = nullptr;
8994307f 1613 unsigned int relayd_hang_up = 0;
fd424d99
JG
1614 const size_t subbuf_content_size = buffer->size - padding;
1615 size_t write_len;
f02e1e8a
DG
1616
1617 /* RCU lock for the relayd pointer */
56047f5a 1618 lttng::urcu::read_lock_guard read_lock;
28ab034a 1619 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
d2956687 1620
f02e1e8a 1621 /* Flag that the current stream if set for network streaming. */
da009f2c 1622 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1623 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1624 if (relayd == nullptr) {
56591bac 1625 ret = -EPIPE;
f02e1e8a
DG
1626 goto end;
1627 }
1628 }
1629
f02e1e8a
DG
1630 /* Handle stream on the relayd if the output is on the network */
1631 if (relayd) {
fd424d99 1632 unsigned long netlen = subbuf_content_size;
f02e1e8a
DG
1633
1634 /*
1635 * Lock the control socket for the complete duration of the function
1636 * since from this point on we will use the socket.
1637 */
1638 if (stream->metadata_flag) {
1639 /* Metadata requires the control socket. */
1640 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1641 if (stream->reset_metadata_flag) {
1642 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1643 stream->relayd_stream_id,
1644 stream->metadata_version);
93ec662e
JD
1645 if (ret < 0) {
1646 relayd_hang_up = 1;
1647 goto write_error;
1648 }
1649 stream->reset_metadata_flag = 0;
1650 }
1d4dfdef 1651 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1652 }
1653
1d4dfdef 1654 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1655 if (ret < 0) {
1656 relayd_hang_up = 1;
1657 goto write_error;
1658 }
1659 /* Use the returned socket. */
1660 outfd = ret;
f02e1e8a 1661
994ab360
DG
1662 /* Write metadata stream id before payload */
1663 if (stream->metadata_flag) {
239f61af 1664 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1665 if (ret < 0) {
8994307f
DG
1666 relayd_hang_up = 1;
1667 goto write_error;
1668 }
f02e1e8a 1669 }
1624d5b7 1670
fd424d99
JG
1671 write_len = subbuf_content_size;
1672 } else {
1673 /* No streaming; we have to write the full padding. */
93ec662e
JD
1674 if (stream->metadata_flag && stream->reset_metadata_flag) {
1675 ret = utils_truncate_stream_file(stream->out_fd, 0);
1676 if (ret < 0) {
1677 ERR("Reset metadata file");
1678 goto end;
1679 }
1680 stream->reset_metadata_flag = 0;
1681 }
1682
1624d5b7
JD
1683 /*
1684 * Check if we need to change the tracefile before writing the packet.
1685 */
1686 if (stream->chan->tracefile_size > 0 &&
28ab034a
JG
1687 (stream->tracefile_size_current + buffer->size) >
1688 stream->chan->tracefile_size) {
d2956687
JG
1689 ret = consumer_stream_rotate_output_files(stream);
1690 if (ret) {
1624d5b7
JD
1691 goto end;
1692 }
309167d2 1693 outfd = stream->out_fd;
a1ae300f 1694 orig_offset = 0;
1624d5b7 1695 }
fd424d99 1696 stream->tracefile_size_current += buffer->size;
fd424d99 1697 write_len = buffer->size;
f02e1e8a
DG
1698 }
1699
d02b8372
DG
1700 /*
1701 * This call guarantee that len or less is returned. It's impossible to
1702 * receive a ret value that is bigger than len.
1703 */
fd424d99 1704 ret = lttng_write(outfd, buffer->data, write_len);
e2d1190b 1705 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
fd424d99 1706 if (ret < 0 || ((size_t) ret != write_len)) {
d02b8372
DG
1707 /*
1708 * Report error to caller if nothing was written else at least send the
1709 * amount written.
1710 */
1711 if (ret < 0) {
994ab360 1712 ret = -errno;
f02e1e8a 1713 }
994ab360 1714 relayd_hang_up = 1;
f02e1e8a 1715
d02b8372 1716 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1717 if (errno == EPIPE) {
d02b8372
DG
1718 /*
1719 * This is possible if the fd is closed on the other side
1720 * (outfd) or any write problem. It can be verbose a bit for a
1721 * normal execution if for instance the relayd is stopped
1722 * abruptly. This can happen so set this to a DBG statement.
1723 */
1724 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1725 } else {
1726 /* Unhandled error, print it and stop function right now. */
28ab034a 1727 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
f02e1e8a 1728 }
994ab360 1729 goto write_error;
d02b8372
DG
1730 }
1731 stream->output_written += ret;
d02b8372
DG
1732
1733 /* This call is useless on a socket so better save a syscall. */
1734 if (!relayd) {
1735 /* This won't block, but will start writeout asynchronously */
28ab034a
JG
1736 lttng_sync_file_range(
1737 outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
fd424d99 1738 stream->out_fd_offset += write_len;
f5dbe415 1739 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1740 }
f02e1e8a 1741
8994307f
DG
1742write_error:
1743 /*
1744 * This is a special case that the relayd has closed its socket. Let's
1745 * cleanup the relayd object and all associated streams.
1746 */
1747 if (relayd && relayd_hang_up) {
28ab034a 1748 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1749 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1750 }
1751
f02e1e8a
DG
1752end:
1753 /* Unlock only if ctrl socket used */
1754 if (relayd && stream->metadata_flag) {
1755 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1756 }
1757
994ab360 1758 return ret;
3bd1e081
MD
1759}
1760
1761/*
1762 * Splice the data from the ring buffer to the tracefile.
1763 *
79d4ffb7
DG
1764 * It must be called with the stream lock held.
1765 *
3bd1e081
MD
1766 * Returns the number of bytes spliced.
1767 */
28ab034a
JG
1768ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1769 struct lttng_consumer_stream *stream,
1770 unsigned long len,
1771 unsigned long padding)
3bd1e081 1772{
f02e1e8a
DG
1773 ssize_t ret = 0, written = 0, ret_splice = 0;
1774 loff_t offset = 0;
1775 off_t orig_offset = stream->out_fd_offset;
1776 int fd = stream->wait_fd;
1777 /* Default is on the disk */
1778 int outfd = stream->out_fd;
cd9adb8b 1779 struct consumer_relayd_sock_pair *relayd = nullptr;
fb3a43a9 1780 int *splice_pipe;
8994307f 1781 unsigned int relayd_hang_up = 0;
f02e1e8a 1782
fa29bfbf 1783 switch (the_consumer_data.type) {
3bd1e081 1784 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1785 break;
7753dea8
MD
1786 case LTTNG_CONSUMER32_UST:
1787 case LTTNG_CONSUMER64_UST:
f02e1e8a 1788 /* Not supported for user space tracing */
3bd1e081
MD
1789 return -ENOSYS;
1790 default:
1791 ERR("Unknown consumer_data type");
a0377dfe 1792 abort();
3bd1e081
MD
1793 }
1794
f02e1e8a 1795 /* RCU lock for the relayd pointer */
56047f5a 1796 lttng::urcu::read_lock_guard read_lock;
f02e1e8a
DG
1797
1798 /* Flag that the current stream if set for network streaming. */
da009f2c 1799 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1800 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1801 if (relayd == nullptr) {
ad0b0d23 1802 written = -ret;
f02e1e8a
DG
1803 goto end;
1804 }
1805 }
a2361a61 1806 splice_pipe = stream->splice_pipe;
fb3a43a9 1807
f02e1e8a 1808 /* Write metadata stream id before payload */
1d4dfdef 1809 if (relayd) {
ad0b0d23 1810 unsigned long total_len = len;
f02e1e8a 1811
1d4dfdef
DG
1812 if (stream->metadata_flag) {
1813 /*
1814 * Lock the control socket for the complete duration of the function
1815 * since from this point on we will use the socket.
1816 */
1817 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1818
93ec662e
JD
1819 if (stream->reset_metadata_flag) {
1820 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1821 stream->relayd_stream_id,
1822 stream->metadata_version);
93ec662e
JD
1823 if (ret < 0) {
1824 relayd_hang_up = 1;
1825 goto write_error;
1826 }
1827 stream->reset_metadata_flag = 0;
1828 }
28ab034a 1829 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1d4dfdef
DG
1830 if (ret < 0) {
1831 written = ret;
ad0b0d23
DG
1832 relayd_hang_up = 1;
1833 goto write_error;
1d4dfdef
DG
1834 }
1835
1836 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1837 }
1838
1839 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1840 if (ret < 0) {
1841 written = ret;
1842 relayd_hang_up = 1;
1843 goto write_error;
f02e1e8a 1844 }
ad0b0d23
DG
1845 /* Use the returned socket. */
1846 outfd = ret;
1d4dfdef
DG
1847 } else {
1848 /* No streaming, we have to set the len with the full padding */
1849 len += padding;
1624d5b7 1850
93ec662e
JD
1851 if (stream->metadata_flag && stream->reset_metadata_flag) {
1852 ret = utils_truncate_stream_file(stream->out_fd, 0);
1853 if (ret < 0) {
1854 ERR("Reset metadata file");
1855 goto end;
1856 }
1857 stream->reset_metadata_flag = 0;
1858 }
1624d5b7
JD
1859 /*
1860 * Check if we need to change the tracefile before writing the packet.
1861 */
1862 if (stream->chan->tracefile_size > 0 &&
28ab034a 1863 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
d2956687 1864 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1865 if (ret < 0) {
ad0b0d23 1866 written = ret;
1624d5b7
JD
1867 goto end;
1868 }
309167d2 1869 outfd = stream->out_fd;
a1ae300f 1870 orig_offset = 0;
1624d5b7
JD
1871 }
1872 stream->tracefile_size_current += len;
f02e1e8a
DG
1873 }
1874
1875 while (len > 0) {
1d4dfdef 1876 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
28ab034a
JG
1877 (unsigned long) offset,
1878 len,
1879 fd,
1880 splice_pipe[1]);
1881 ret_splice = splice(
cd9adb8b 1882 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
f02e1e8a
DG
1883 DBG("splice chan to pipe, ret %zd", ret_splice);
1884 if (ret_splice < 0) {
d02b8372 1885 ret = errno;
ad0b0d23 1886 written = -ret;
d02b8372 1887 PERROR("Error in relay splice");
f02e1e8a
DG
1888 goto splice_error;
1889 }
1890
1891 /* Handle stream on the relayd if the output is on the network */
ad0b0d23
DG
1892 if (relayd && stream->metadata_flag) {
1893 size_t metadata_payload_size =
1894 sizeof(struct lttcomm_relayd_metadata_payload);
1895
1896 /* Update counter to fit the spliced data */
1897 ret_splice += metadata_payload_size;
1898 len += metadata_payload_size;
1899 /*
1900 * We do this so the return value can match the len passed as
1901 * argument to this function.
1902 */
1903 written -= metadata_payload_size;
f02e1e8a
DG
1904 }
1905
1906 /* Splice data out */
28ab034a 1907 ret_splice = splice(splice_pipe[0],
cd9adb8b 1908 nullptr,
28ab034a 1909 outfd,
cd9adb8b 1910 nullptr,
28ab034a
JG
1911 ret_splice,
1912 SPLICE_F_MOVE | SPLICE_F_MORE);
1913 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
f02e1e8a 1914 if (ret_splice < 0) {
d02b8372 1915 ret = errno;
ad0b0d23
DG
1916 written = -ret;
1917 relayd_hang_up = 1;
1918 goto write_error;
f02e1e8a 1919 } else if (ret_splice > len) {
d02b8372
DG
1920 /*
1921 * We don't expect this code path to be executed but you never know
1922 * so this is an extra protection agains a buggy splice().
1923 */
f02e1e8a 1924 ret = errno;
ad0b0d23 1925 written += ret_splice;
28ab034a 1926 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
f02e1e8a 1927 goto splice_error;
d02b8372
DG
1928 } else {
1929 /* All good, update current len and continue. */
1930 len -= ret_splice;
f02e1e8a 1931 }
f02e1e8a
DG
1932
1933 /* This call is useless on a socket so better save a syscall. */
1934 if (!relayd) {
1935 /* This won't block, but will start writeout asynchronously */
28ab034a
JG
1936 lttng_sync_file_range(
1937 outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
f02e1e8a
DG
1938 stream->out_fd_offset += ret_splice;
1939 }
e5d1a9b3 1940 stream->output_written += ret_splice;
f02e1e8a
DG
1941 written += ret_splice;
1942 }
f5dbe415
JG
1943 if (!relayd) {
1944 lttng_consumer_sync_trace_file(stream, orig_offset);
1945 }
f02e1e8a
DG
1946 goto end;
1947
8994307f
DG
1948write_error:
1949 /*
1950 * This is a special case that the relayd has closed its socket. Let's
1951 * cleanup the relayd object and all associated streams.
1952 */
1953 if (relayd && relayd_hang_up) {
28ab034a 1954 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1955 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1956 /* Skip splice error so the consumer does not fail */
1957 goto end;
1958 }
1959
f02e1e8a
DG
1960splice_error:
1961 /* send the appropriate error description to sessiond */
1962 switch (ret) {
f02e1e8a 1963 case EINVAL:
f73fabfd 1964 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1965 break;
1966 case ENOMEM:
f73fabfd 1967 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1968 break;
1969 case ESPIPE:
f73fabfd 1970 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1971 break;
1972 }
1973
1974end:
1975 if (relayd && stream->metadata_flag) {
1976 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1977 }
1978
f02e1e8a 1979 return written;
3bd1e081
MD
1980}
1981
15055ce5
JD
1982/*
1983 * Sample the snapshot positions for a specific fd
1984 *
1985 * Returns 0 on success, < 0 on error
1986 */
1987int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1988{
fa29bfbf 1989 switch (the_consumer_data.type) {
15055ce5
JD
1990 case LTTNG_CONSUMER_KERNEL:
1991 return lttng_kconsumer_sample_snapshot_positions(stream);
1992 case LTTNG_CONSUMER32_UST:
1993 case LTTNG_CONSUMER64_UST:
1994 return lttng_ustconsumer_sample_snapshot_positions(stream);
1995 default:
1996 ERR("Unknown consumer_data type");
a0377dfe 1997 abort();
15055ce5
JD
1998 return -ENOSYS;
1999 }
2000}
3bd1e081
MD
2001/*
2002 * Take a snapshot for a specific fd
2003 *
2004 * Returns 0 on success, < 0 on error
2005 */
ffe60014 2006int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 2007{
fa29bfbf 2008 switch (the_consumer_data.type) {
3bd1e081 2009 case LTTNG_CONSUMER_KERNEL:
ffe60014 2010 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
2011 case LTTNG_CONSUMER32_UST:
2012 case LTTNG_CONSUMER64_UST:
ffe60014 2013 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
2014 default:
2015 ERR("Unknown consumer_data type");
a0377dfe 2016 abort();
3bd1e081
MD
2017 return -ENOSYS;
2018 }
3bd1e081
MD
2019}
2020
2021/*
2022 * Get the produced position
2023 *
2024 * Returns 0 on success, < 0 on error
2025 */
28ab034a 2026int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 2027{
fa29bfbf 2028 switch (the_consumer_data.type) {
3bd1e081 2029 case LTTNG_CONSUMER_KERNEL:
ffe60014 2030 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
2031 case LTTNG_CONSUMER32_UST:
2032 case LTTNG_CONSUMER64_UST:
ffe60014 2033 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2034 default:
2035 ERR("Unknown consumer_data type");
a0377dfe 2036 abort();
3bd1e081
MD
2037 return -ENOSYS;
2038 }
2039}
2040
15055ce5
JD
2041/*
2042 * Get the consumed position (free-running counter position in bytes).
2043 *
2044 * Returns 0 on success, < 0 on error
2045 */
28ab034a 2046int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
15055ce5 2047{
fa29bfbf 2048 switch (the_consumer_data.type) {
15055ce5
JD
2049 case LTTNG_CONSUMER_KERNEL:
2050 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2051 case LTTNG_CONSUMER32_UST:
2052 case LTTNG_CONSUMER64_UST:
2053 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2054 default:
2055 ERR("Unknown consumer_data type");
a0377dfe 2056 abort();
15055ce5
JD
2057 return -ENOSYS;
2058 }
2059}
2060
3bd1e081 2061int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
2062 int sock,
2063 struct pollfd *consumer_sockpoll)
3bd1e081 2064{
fa29bfbf 2065 switch (the_consumer_data.type) {
3bd1e081
MD
2066 case LTTNG_CONSUMER_KERNEL:
2067 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2068 case LTTNG_CONSUMER32_UST:
2069 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2070 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2071 default:
2072 ERR("Unknown consumer_data type");
a0377dfe 2073 abort();
3bd1e081
MD
2074 return -ENOSYS;
2075 }
2076}
2077
cd9adb8b 2078static void lttng_consumer_close_all_metadata()
d88aee68 2079{
fa29bfbf 2080 switch (the_consumer_data.type) {
d88aee68
DG
2081 case LTTNG_CONSUMER_KERNEL:
2082 /*
2083 * The Kernel consumer has a different metadata scheme so we don't
2084 * close anything because the stream will be closed by the session
2085 * daemon.
2086 */
2087 break;
2088 case LTTNG_CONSUMER32_UST:
2089 case LTTNG_CONSUMER64_UST:
2090 /*
2091 * Close all metadata streams. The metadata hash table is passed and
2092 * this call iterates over it by closing all wakeup fd. This is safe
2093 * because at this point we are sure that the metadata producer is
2094 * either dead or blocked.
2095 */
6d574024 2096 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2097 break;
2098 default:
2099 ERR("Unknown consumer_data type");
a0377dfe 2100 abort();
d88aee68
DG
2101 }
2102}
2103
fb3a43a9
DG
2104/*
2105 * Clean up a metadata stream and free its memory.
2106 */
28ab034a 2107void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
fb3a43a9 2108{
cd9adb8b 2109 struct lttng_consumer_channel *channel = nullptr;
a6ef8ee6 2110 bool free_channel = false;
fb3a43a9 2111
a0377dfe 2112 LTTNG_ASSERT(stream);
fb3a43a9
DG
2113 /*
2114 * This call should NEVER receive regular stream. It must always be
2115 * metadata stream and this is crucial for data structure synchronization.
2116 */
a0377dfe 2117 LTTNG_ASSERT(stream->metadata_flag);
fb3a43a9 2118
e316aad5
DG
2119 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2120
fa29bfbf 2121 pthread_mutex_lock(&the_consumer_data.lock);
a6ef8ee6
JG
2122 /*
2123 * Note that this assumes that a stream's channel is never changed and
2124 * that the stream's lock doesn't need to be taken to sample its
2125 * channel.
2126 */
2127 channel = stream->chan;
2128 pthread_mutex_lock(&channel->lock);
3dad2c0f 2129 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2130 if (channel->metadata_cache) {
081424af 2131 /* Only applicable to userspace consumers. */
a6ef8ee6 2132 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2133 }
8994307f 2134
6d574024
DG
2135 /* Remove any reference to that stream. */
2136 consumer_stream_delete(stream, ht);
ca22feea 2137
6d574024 2138 /* Close down everything including the relayd if one. */
d119bd01 2139 consumer_stream_close_output(stream);
6d574024
DG
2140 /* Destroy tracer buffers of the stream. */
2141 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2142
2143 /* Atomically decrement channel refcount since other threads can use it. */
28ab034a
JG
2144 if (!uatomic_sub_return(&channel->refcount, 1) &&
2145 !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2146 /* Go for channel deletion! */
a6ef8ee6 2147 free_channel = true;
fb3a43a9 2148 }
cd9adb8b 2149 stream->chan = nullptr;
fb3a43a9 2150
73811ecc
DG
2151 /*
2152 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2153 * channel lock MUST be acquired before being able to check for a NULL
2154 * pointer value.
73811ecc 2155 */
cd9adb8b 2156 channel->metadata_stream = nullptr;
73811ecc 2157
a6ef8ee6
JG
2158 if (channel->metadata_cache) {
2159 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2160 }
3dad2c0f 2161 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2162 pthread_mutex_unlock(&channel->lock);
fa29bfbf 2163 pthread_mutex_unlock(&the_consumer_data.lock);
e316aad5 2164
a6ef8ee6
JG
2165 if (free_channel) {
2166 consumer_del_channel(channel);
e316aad5
DG
2167 }
2168
d2956687 2169 lttng_trace_chunk_put(stream->trace_chunk);
cd9adb8b 2170 stream->trace_chunk = nullptr;
6d574024 2171 consumer_stream_free(stream);
fb3a43a9
DG
2172}
2173
2174/*
2175 * Action done with the metadata stream when adding it to the consumer internal
2176 * data structures to handle it.
2177 */
66d583dc 2178void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2179{
5ab66908 2180 struct lttng_ht *ht = metadata_ht;
76082088 2181 struct lttng_ht_iter iter;
d88aee68 2182 struct lttng_ht_node_u64 *node;
fb3a43a9 2183
a0377dfe
FD
2184 LTTNG_ASSERT(stream);
2185 LTTNG_ASSERT(ht);
e316aad5 2186
d88aee68 2187 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5 2188
fa29bfbf 2189 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 2190 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2191 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2192 pthread_mutex_lock(&stream->lock);
e316aad5 2193
e316aad5
DG
2194 /*
2195 * From here, refcounts are updated so be _careful_ when returning an error
2196 * after this point.
2197 */
2198
56047f5a 2199 lttng::urcu::read_lock_guard read_lock;
76082088
DG
2200
2201 /*
2202 * Lookup the stream just to make sure it does not exist in our internal
2203 * state. This should NEVER happen.
2204 */
d88aee68
DG
2205 lttng_ht_lookup(ht, &stream->key, &iter);
2206 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2207 LTTNG_ASSERT(!node);
76082088 2208
e316aad5 2209 /*
ffe60014
DG
2210 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2211 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2212 * causes the count to become 0 also causes a stream to be added. The
2213 * channel deletion will thus be triggered by the following removal of this
2214 * stream.
2215 */
ffe60014 2216 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2217 /* Increment refcount before decrementing nb_init_stream_left */
2218 cmm_smp_wmb();
ffe60014 2219 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2220 }
2221
d88aee68 2222 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2223
28ab034a 2224 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 2225
ca22feea
DG
2226 /*
2227 * Add stream to the stream_list_ht of the consumer data. No need to steal
2228 * the key since the HT does not use it and we allow to add redundant keys
2229 * into this table.
2230 */
28ab034a 2231 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2232
2e818a6a 2233 pthread_mutex_unlock(&stream->lock);
a9838785 2234 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2235 pthread_mutex_unlock(&stream->chan->timer_lock);
fa29bfbf 2236 pthread_mutex_unlock(&the_consumer_data.lock);
fb3a43a9
DG
2237}
2238
8994307f
DG
2239/*
2240 * Delete data stream that are flagged for deletion (endpoint_status).
2241 */
cd9adb8b 2242static void validate_endpoint_status_data_stream()
8994307f
DG
2243{
2244 struct lttng_ht_iter iter;
2245 struct lttng_consumer_stream *stream;
2246
2247 DBG("Consumer delete flagged data stream");
2248
56047f5a
JG
2249 {
2250 lttng::urcu::read_lock_guard read_lock;
2251
2252 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2253 /* Validate delete flag of the stream */
2254 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2255 continue;
2256 }
2257 /* Delete it right now */
2258 consumer_del_stream(stream, data_ht);
8994307f 2259 }
8994307f 2260 }
8994307f
DG
2261}
2262
2263/*
2264 * Delete metadata stream that are flagged for deletion (endpoint_status).
2265 */
28ab034a 2266static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
8994307f
DG
2267{
2268 struct lttng_ht_iter iter;
2269 struct lttng_consumer_stream *stream;
2270
2271 DBG("Consumer delete flagged metadata stream");
2272
a0377dfe 2273 LTTNG_ASSERT(pollset);
8994307f 2274
56047f5a
JG
2275 {
2276 lttng::urcu::read_lock_guard read_lock;
2277 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2278 /* Validate delete flag of the stream */
2279 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2280 continue;
2281 }
2282 /*
2283 * Remove from pollset so the metadata thread can continue without
2284 * blocking on a deleted stream.
2285 */
2286 lttng_poll_del(pollset, stream->wait_fd);
8994307f 2287
56047f5a
JG
2288 /* Delete it right now */
2289 consumer_del_metadata_stream(stream, metadata_ht);
2290 }
8994307f 2291 }
8994307f
DG
2292}
2293
fb3a43a9
DG
2294/*
2295 * Thread polls on metadata file descriptor and write them on disk or on the
2296 * network.
2297 */
7d980def 2298void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2299{
1fc79fb4 2300 int ret, i, pollfd, err = -1;
fb3a43a9 2301 uint32_t revents, nb_fd;
cd9adb8b 2302 struct lttng_consumer_stream *stream = nullptr;
fb3a43a9 2303 struct lttng_ht_iter iter;
d88aee68 2304 struct lttng_ht_node_u64 *node;
fb3a43a9 2305 struct lttng_poll_event events;
97535efa 2306 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
fb3a43a9
DG
2307 ssize_t len;
2308
2309 rcu_register_thread();
2310
1fc79fb4
MD
2311 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2312
2d57de81
MD
2313 if (testpoint(consumerd_thread_metadata)) {
2314 goto error_testpoint;
2315 }
2316
9ce5646a
MD
2317 health_code_update();
2318
fb3a43a9
DG
2319 DBG("Thread metadata poll started");
2320
fb3a43a9
DG
2321 /* Size is set to 1 for the consumer_metadata pipe */
2322 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2323 if (ret < 0) {
2324 ERR("Poll set creation failed");
d8ef542d 2325 goto end_poll;
fb3a43a9
DG
2326 }
2327
28ab034a 2328 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2329 if (ret < 0) {
2330 goto end;
2331 }
2332
2333 /* Main loop */
2334 DBG("Metadata main loop started");
2335
cd9adb8b 2336 while (true) {
28ab034a 2337 restart:
7fa2082e 2338 health_code_update();
9ce5646a 2339 health_poll_entry();
7fa2082e 2340 DBG("Metadata poll wait");
fb3a43a9 2341 ret = lttng_poll_wait(&events, -1);
28ab034a 2342 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2343 health_poll_exit();
40063ead 2344 DBG("Metadata event caught in thread");
fb3a43a9
DG
2345 if (ret < 0) {
2346 if (errno == EINTR) {
40063ead 2347 ERR("Poll EINTR caught");
fb3a43a9
DG
2348 goto restart;
2349 }
d9607cd7 2350 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2351 err = 0; /* All is OK */
d9607cd7
MD
2352 }
2353 goto end;
fb3a43a9
DG
2354 }
2355
0d9c5d77
DG
2356 nb_fd = ret;
2357
e316aad5 2358 /* From here, the event is a metadata wait fd */
fb3a43a9 2359 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2360 health_code_update();
2361
fb3a43a9
DG
2362 revents = LTTNG_POLL_GETEV(&events, i);
2363 pollfd = LTTNG_POLL_GETFD(&events, i);
2364
13886d2d 2365 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2366 if (revents & LPOLLIN) {
13886d2d
DG
2367 ssize_t pipe_len;
2368
2369 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
28ab034a 2370 &stream,
5c7248cd
JG
2371 sizeof(stream)); /* NOLINT sizeof
2372 used on a
2373 pointer. */
2374 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2375 pointer. */
03e43155
MD
2376 if (pipe_len < 0) {
2377 PERROR("read metadata stream");
2378 }
fb3a43a9 2379 /*
28ab034a
JG
2380 * Remove the pipe from the poll set and continue
2381 * the loop since their might be data to consume.
fb3a43a9 2382 */
28ab034a
JG
2383 lttng_poll_del(
2384 &events,
2385 lttng_pipe_get_readfd(
2386 ctx->consumer_metadata_pipe));
03e43155 2387 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2388 continue;
2389 }
2390
8994307f 2391 /* A NULL stream means that the state has changed. */
cd9adb8b 2392 if (stream == nullptr) {
8994307f
DG
2393 /* Check for deleted streams. */
2394 validate_endpoint_status_metadata_stream(&events);
3714380f 2395 goto restart;
8994307f
DG
2396 }
2397
fb3a43a9 2398 DBG("Adding metadata stream %d to poll set",
28ab034a 2399 stream->wait_fd);
fb3a43a9 2400
fb3a43a9 2401 /* Add metadata stream to the global poll events list */
28ab034a
JG
2402 lttng_poll_add(
2403 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2404 } else if (revents & (LPOLLERR | LPOLLHUP)) {
03e43155
MD
2405 DBG("Metadata thread pipe hung up");
2406 /*
2407 * Remove the pipe from the poll set and continue the loop
2408 * since their might be data to consume.
2409 */
28ab034a
JG
2410 lttng_poll_del(
2411 &events,
2412 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
03e43155
MD
2413 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2414 continue;
2415 } else {
28ab034a
JG
2416 ERR("Unexpected poll events %u for sock %d",
2417 revents,
2418 pollfd);
03e43155 2419 goto end;
fb3a43a9
DG
2420 }
2421
e316aad5 2422 /* Handle other stream */
fb3a43a9
DG
2423 continue;
2424 }
2425
56047f5a 2426 lttng::urcu::read_lock_guard read_lock;
d88aee68
DG
2427 {
2428 uint64_t tmp_id = (uint64_t) pollfd;
2429
2430 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2431 }
2432 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2433 LTTNG_ASSERT(node);
fb3a43a9 2434
28ab034a 2435 stream = caa_container_of(node, struct lttng_consumer_stream, node);
fb3a43a9 2436
03e43155
MD
2437 if (revents & (LPOLLIN | LPOLLPRI)) {
2438 /* Get the data out of the metadata file descriptor */
2439 DBG("Metadata available on fd %d", pollfd);
a0377dfe 2440 LTTNG_ASSERT(stream->wait_fd == pollfd);
03e43155
MD
2441
2442 do {
2443 health_code_update();
2444
6f9449c2 2445 len = ctx->on_buffer_ready(stream, ctx, false);
03e43155
MD
2446 /*
2447 * We don't check the return value here since if we get
83f4233d 2448 * a negative len, it means an error occurred thus we
03e43155
MD
2449 * simply remove it from the poll set and free the
2450 * stream.
2451 */
2452 } while (len > 0);
2453
2454 /* It's ok to have an unavailable sub-buffer */
2455 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2456 /* Clean up stream from consumer and free it. */
2457 lttng_poll_del(&events, stream->wait_fd);
2458 consumer_del_metadata_stream(stream, metadata_ht);
2459 }
2460 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2461 DBG("Metadata fd %d is hup|err.", pollfd);
fa29bfbf 2462 if (!stream->hangup_flush_done &&
28ab034a
JG
2463 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2464 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
fb3a43a9
DG
2465 DBG("Attempting to flush and consume the UST buffers");
2466 lttng_ustconsumer_on_stream_hangup(stream);
2467
2468 /* We just flushed the stream now read it. */
4bb94b75 2469 do {
9ce5646a
MD
2470 health_code_update();
2471
6f9449c2 2472 len = ctx->on_buffer_ready(stream, ctx, false);
4bb94b75 2473 /*
28ab034a
JG
2474 * We don't check the return value here since if we
2475 * get a negative len, it means an error occurred
2476 * thus we simply remove it from the poll set and
2477 * free the stream.
4bb94b75
DG
2478 */
2479 } while (len > 0);
fb3a43a9
DG
2480 }
2481
fb3a43a9 2482 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2483 /*
2484 * This call update the channel states, closes file descriptors
2485 * and securely free the stream.
2486 */
2487 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2488 } else {
2489 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 2490 goto end;
fb3a43a9 2491 }
e316aad5 2492 /* Release RCU lock for the stream looked up */
fb3a43a9
DG
2493 }
2494 }
2495
1fc79fb4
MD
2496 /* All is OK */
2497 err = 0;
fb3a43a9
DG
2498end:
2499 DBG("Metadata poll thread exiting");
fb3a43a9 2500
d8ef542d
MD
2501 lttng_poll_clean(&events);
2502end_poll:
2d57de81 2503error_testpoint:
1fc79fb4
MD
2504 if (err) {
2505 health_error();
2506 ERR("Health error occurred in %s", __func__);
2507 }
2508 health_unregister(health_consumerd);
fb3a43a9 2509 rcu_unregister_thread();
cd9adb8b 2510 return nullptr;
fb3a43a9
DG
2511}
2512
3bd1e081 2513/*
e4421fec 2514 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2515 * it to tracefile if necessary.
2516 */
7d980def 2517void *consumer_thread_data_poll(void *data)
3bd1e081 2518{
1fc79fb4 2519 int num_rdy, num_hup, high_prio, ret, i, err = -1;
cd9adb8b 2520 struct pollfd *pollfd = nullptr;
3bd1e081 2521 /* local view of the streams */
cd9adb8b 2522 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
3bd1e081 2523 /* local view of consumer_data.fds_count */
8bdcc002
JG
2524 int nb_fd = 0;
2525 /* 2 for the consumer_data_pipe and wake up pipe */
2526 const int nb_pipes_fd = 2;
9a2fcf78
JD
2527 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2528 int nb_inactive_fd = 0;
97535efa 2529 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
00e2e675 2530 ssize_t len;
3bd1e081 2531
e7b994a3
DG
2532 rcu_register_thread();
2533
1fc79fb4
MD
2534 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2535
2d57de81
MD
2536 if (testpoint(consumerd_thread_data)) {
2537 goto error_testpoint;
2538 }
2539
9ce5646a
MD
2540 health_code_update();
2541
64803277 2542 local_stream = zmalloc<lttng_consumer_stream *>();
cd9adb8b 2543 if (local_stream == nullptr) {
4df6c8cb
MD
2544 PERROR("local_stream malloc");
2545 goto end;
2546 }
3bd1e081 2547
cd9adb8b 2548 while (true) {
9ce5646a
MD
2549 health_code_update();
2550
3bd1e081
MD
2551 high_prio = 0;
2552 num_hup = 0;
2553
2554 /*
e4421fec 2555 * the fds set has been updated, we need to update our
3bd1e081
MD
2556 * local array as well
2557 */
fa29bfbf
SM
2558 pthread_mutex_lock(&the_consumer_data.lock);
2559 if (the_consumer_data.need_update) {
0e428499 2560 free(pollfd);
cd9adb8b 2561 pollfd = nullptr;
0e428499
DG
2562
2563 free(local_stream);
cd9adb8b 2564 local_stream = nullptr;
3bd1e081 2565
8bdcc002 2566 /* Allocate for all fds */
28ab034a
JG
2567 pollfd =
2568 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2569 if (pollfd == nullptr) {
7a57cf92 2570 PERROR("pollfd malloc");
fa29bfbf 2571 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2572 goto end;
2573 }
2574
28ab034a
JG
2575 local_stream = calloc<lttng_consumer_stream *>(
2576 the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2577 if (local_stream == nullptr) {
7a57cf92 2578 PERROR("local_stream malloc");
fa29bfbf 2579 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2580 goto end;
2581 }
28ab034a
JG
2582 ret = update_poll_array(
2583 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
3bd1e081
MD
2584 if (ret < 0) {
2585 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2586 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
fa29bfbf 2587 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2588 goto end;
2589 }
2590 nb_fd = ret;
fa29bfbf 2591 the_consumer_data.need_update = 0;
3bd1e081 2592 }
fa29bfbf 2593 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081 2594
4078b776 2595 /* No FDs and consumer_quit, consumer_cleanup the thread */
28ab034a
JG
2596 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2597 err = 0; /* All is OK */
4078b776
MD
2598 goto end;
2599 }
3bd1e081 2600 /* poll on the array of fds */
88f2b785 2601 restart:
261de637 2602 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2603 if (testpoint(consumerd_thread_data_poll)) {
2604 goto end;
2605 }
9ce5646a 2606 health_poll_entry();
261de637 2607 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2608 health_poll_exit();
3bd1e081
MD
2609 DBG("poll num_rdy : %d", num_rdy);
2610 if (num_rdy == -1) {
88f2b785
MD
2611 /*
2612 * Restart interrupted system call.
2613 */
2614 if (errno == EINTR) {
2615 goto restart;
2616 }
7a57cf92 2617 PERROR("Poll error");
f73fabfd 2618 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2619 goto end;
2620 } else if (num_rdy == 0) {
2621 DBG("Polling thread timed out");
2622 goto end;
2623 }
2624
80957876
JG
2625 if (caa_unlikely(data_consumption_paused)) {
2626 DBG("Data consumption paused, sleeping...");
2627 sleep(1);
2628 goto restart;
2629 }
2630
3bd1e081 2631 /*
50f8ae69 2632 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2633 * beginning of the loop to update the array. We want to prioritize
2634 * array update over low-priority reads.
3bd1e081 2635 */
509bb1cf 2636 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2637 ssize_t pipe_readlen;
04fdd819 2638
50f8ae69 2639 DBG("consumer_data_pipe wake up");
5c7248cd
JG
2640 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2641 &new_stream,
2642 sizeof(new_stream)); /* NOLINT sizeof used on
2643 a pointer. */
2644 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2645 */
6cd525e8 2646 PERROR("Consumer data pipe");
23f5f35d
DG
2647 /* Continue so we can at least handle the current stream(s). */
2648 continue;
2649 }
c869f647
DG
2650
2651 /*
2652 * If the stream is NULL, just ignore it. It's also possible that
2653 * the sessiond poll thread changed the consumer_quit state and is
2654 * waking us up to test it.
2655 */
cd9adb8b 2656 if (new_stream == nullptr) {
8994307f 2657 validate_endpoint_status_data_stream();
c869f647
DG
2658 continue;
2659 }
2660
c869f647 2661 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2662 continue;
2663 }
2664
02b3d176
DG
2665 /* Handle wakeup pipe. */
2666 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2667 char dummy;
2668 ssize_t pipe_readlen;
2669
28ab034a
JG
2670 pipe_readlen =
2671 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
02b3d176
DG
2672 if (pipe_readlen < 0) {
2673 PERROR("Consumer data wakeup pipe");
2674 }
2675 /* We've been awakened to handle stream(s). */
2676 ctx->has_wakeup = 0;
2677 }
2678
3bd1e081
MD
2679 /* Take care of high priority channels first. */
2680 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2681 health_code_update();
2682
cd9adb8b 2683 if (local_stream[i] == nullptr) {
9617607b
DG
2684 continue;
2685 }
fb3a43a9 2686 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2687 DBG("Urgent read on fd %d", pollfd[i].fd);
2688 high_prio = 1;
6f9449c2 2689 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
d41f73b7 2690 /* it's ok to have an unavailable sub-buffer */
b64403e3 2691 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2692 /* Clean the stream and free it. */
2693 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2694 local_stream[i] = nullptr;
4078b776 2695 } else if (len > 0) {
28ab034a
JG
2696 local_stream[i]->has_data_left_to_be_read_before_teardown =
2697 1;
d41f73b7 2698 }
3bd1e081
MD
2699 }
2700 }
2701
4078b776
MD
2702 /*
2703 * If we read high prio channel in this loop, try again
2704 * for more high prio data.
2705 */
2706 if (high_prio) {
3bd1e081
MD
2707 continue;
2708 }
2709
2710 /* Take care of low priority channels. */
4078b776 2711 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2712 health_code_update();
2713
cd9adb8b 2714 if (local_stream[i] == nullptr) {
9617607b
DG
2715 continue;
2716 }
28ab034a
JG
2717 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2718 local_stream[i]->has_data) {
4078b776 2719 DBG("Normal read on fd %d", pollfd[i].fd);
6f9449c2 2720 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
4078b776 2721 /* it's ok to have an unavailable sub-buffer */
b64403e3 2722 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2723 /* Clean the stream and free it. */
2724 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2725 local_stream[i] = nullptr;
4078b776 2726 } else if (len > 0) {
28ab034a
JG
2727 local_stream[i]->has_data_left_to_be_read_before_teardown =
2728 1;
4078b776
MD
2729 }
2730 }
2731 }
2732
2733 /* Handle hangup and errors */
2734 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2735 health_code_update();
2736
cd9adb8b 2737 if (local_stream[i] == nullptr) {
9617607b
DG
2738 continue;
2739 }
28ab034a
JG
2740 if (!local_stream[i]->hangup_flush_done &&
2741 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2742 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2743 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
4078b776 2744 DBG("fd %d is hup|err|nval. Attempting flush and read.",
28ab034a 2745 pollfd[i].fd);
4078b776
MD
2746 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2747 /* Attempt read again, for the data we just flushed. */
c715ddc9 2748 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
4078b776
MD
2749 }
2750 /*
c715ddc9
JG
2751 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2752 * performed. This type of flush ensures that a new packet is produced no
2753 * matter the consumed/produced positions are.
2754 *
2755 * This, in turn, causes the next pass to see that data available for the
2756 * stream. When we come back here, we can be assured that all available
2757 * data has been consumed and we can finally destroy the stream.
2758 *
4078b776
MD
2759 * If the poll flag is HUP/ERR/NVAL and we have
2760 * read no data in this pass, we can remove the
2761 * stream from its hash table.
2762 */
2763 if ((pollfd[i].revents & POLLHUP)) {
2764 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
c715ddc9 2765 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2766 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2767 local_stream[i] = nullptr;
4078b776
MD
2768 num_hup++;
2769 }
2770 } else if (pollfd[i].revents & POLLERR) {
2771 ERR("Error returned in polling fd %d.", pollfd[i].fd);
c715ddc9 2772 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2773 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2774 local_stream[i] = nullptr;
4078b776
MD
2775 num_hup++;
2776 }
2777 } else if (pollfd[i].revents & POLLNVAL) {
2778 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
c715ddc9 2779 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2780 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2781 local_stream[i] = nullptr;
4078b776 2782 num_hup++;
3bd1e081
MD
2783 }
2784 }
cd9adb8b 2785 if (local_stream[i] != nullptr) {
c715ddc9 2786 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
9617607b 2787 }
3bd1e081
MD
2788 }
2789 }
1fc79fb4
MD
2790 /* All is OK */
2791 err = 0;
3bd1e081
MD
2792end:
2793 DBG("polling thread exiting");
0e428499
DG
2794 free(pollfd);
2795 free(local_stream);
fb3a43a9
DG
2796
2797 /*
2798 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2799 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2800 * read side of the pipe. If we close them both, epoll_wait strangely does
2801 * not return and could create a endless wait period if the pipe is the
2802 * only tracked fd in the poll set. The thread will take care of closing
2803 * the read side.
fb3a43a9 2804 */
13886d2d 2805 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2806
2d57de81 2807error_testpoint:
1fc79fb4
MD
2808 if (err) {
2809 health_error();
2810 ERR("Health error occurred in %s", __func__);
2811 }
2812 health_unregister(health_consumerd);
2813
e7b994a3 2814 rcu_unregister_thread();
cd9adb8b 2815 return nullptr;
3bd1e081
MD
2816}
2817
d8ef542d
MD
2818/*
2819 * Close wake-up end of each stream belonging to the channel. This will
2820 * allow the poll() on the stream read-side to detect when the
2821 * write-side (application) finally closes them.
2822 */
28ab034a 2823static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
d8ef542d
MD
2824{
2825 struct lttng_ht *ht;
2826 struct lttng_consumer_stream *stream;
2827 struct lttng_ht_iter iter;
2828
fa29bfbf 2829 ht = the_consumer_data.stream_per_chan_id_ht;
d8ef542d 2830
56047f5a 2831 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2832 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
2833 ht->hash_fct(&channel->key, lttng_ht_seed),
2834 ht->match_fct,
2835 &channel->key,
2836 &iter.iter,
2837 stream,
2838 node_channel_id.node)
2839 {
f2ad556d
MD
2840 /*
2841 * Protect against teardown with mutex.
2842 */
2843 pthread_mutex_lock(&stream->lock);
2844 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2845 goto next;
2846 }
fa29bfbf 2847 switch (the_consumer_data.type) {
d8ef542d
MD
2848 case LTTNG_CONSUMER_KERNEL:
2849 break;
2850 case LTTNG_CONSUMER32_UST:
2851 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2852 if (stream->metadata_flag) {
2853 /* Safe and protected by the stream lock. */
2854 lttng_ustconsumer_close_metadata(stream->chan);
2855 } else {
2856 /*
2857 * Note: a mutex is taken internally within
2858 * liblttng-ust-ctl to protect timer wakeup_fd
2859 * use from concurrent close.
2860 */
2861 lttng_ustconsumer_close_stream_wakeup(stream);
2862 }
d8ef542d
MD
2863 break;
2864 default:
2865 ERR("Unknown consumer_data type");
a0377dfe 2866 abort();
d8ef542d 2867 }
f2ad556d
MD
2868 next:
2869 pthread_mutex_unlock(&stream->lock);
d8ef542d 2870 }
d8ef542d
MD
2871}
2872
2873static void destroy_channel_ht(struct lttng_ht *ht)
2874{
2875 struct lttng_ht_iter iter;
2876 struct lttng_consumer_channel *channel;
2877 int ret;
2878
cd9adb8b 2879 if (ht == nullptr) {
d8ef542d
MD
2880 return;
2881 }
2882
56047f5a
JG
2883 {
2884 lttng::urcu::read_lock_guard read_lock;
2885
2886 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2887 ret = lttng_ht_del(ht, &iter);
2888 LTTNG_ASSERT(ret != 0);
2889 }
d8ef542d 2890 }
d8ef542d
MD
2891
2892 lttng_ht_destroy(ht);
2893}
2894
2895/*
2896 * This thread polls the channel fds to detect when they are being
2897 * closed. It closes all related streams if the channel is detected as
2898 * closed. It is currently only used as a shim layer for UST because the
2899 * consumerd needs to keep the per-stream wakeup end of pipes open for
2900 * periodical flush.
2901 */
2902void *consumer_thread_channel_poll(void *data)
2903{
1fc79fb4 2904 int ret, i, pollfd, err = -1;
d8ef542d 2905 uint32_t revents, nb_fd;
cd9adb8b 2906 struct lttng_consumer_channel *chan = nullptr;
d8ef542d
MD
2907 struct lttng_ht_iter iter;
2908 struct lttng_ht_node_u64 *node;
2909 struct lttng_poll_event events;
97535efa 2910 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
d8ef542d
MD
2911 struct lttng_ht *channel_ht;
2912
2913 rcu_register_thread();
2914
1fc79fb4
MD
2915 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2916
2d57de81
MD
2917 if (testpoint(consumerd_thread_channel)) {
2918 goto error_testpoint;
2919 }
2920
9ce5646a
MD
2921 health_code_update();
2922
d8ef542d
MD
2923 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2924 if (!channel_ht) {
2925 /* ENOMEM at this point. Better to bail out. */
2926 goto end_ht;
2927 }
2928
2929 DBG("Thread channel poll started");
2930
2931 /* Size is set to 1 for the consumer_channel pipe */
2932 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2933 if (ret < 0) {
2934 ERR("Poll set creation failed");
2935 goto end_poll;
2936 }
2937
2938 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2939 if (ret < 0) {
2940 goto end;
2941 }
2942
2943 /* Main loop */
2944 DBG("Channel main loop started");
2945
cd9adb8b 2946 while (true) {
28ab034a 2947 restart:
7fa2082e
MD
2948 health_code_update();
2949 DBG("Channel poll wait");
9ce5646a 2950 health_poll_entry();
d8ef542d 2951 ret = lttng_poll_wait(&events, -1);
28ab034a 2952 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2953 health_poll_exit();
40063ead 2954 DBG("Channel event caught in thread");
d8ef542d
MD
2955 if (ret < 0) {
2956 if (errno == EINTR) {
40063ead 2957 ERR("Poll EINTR caught");
d8ef542d
MD
2958 goto restart;
2959 }
d9607cd7 2960 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2961 err = 0; /* All is OK */
d9607cd7 2962 }
d8ef542d
MD
2963 goto end;
2964 }
2965
2966 nb_fd = ret;
2967
2968 /* From here, the event is a channel wait fd */
2969 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2970 health_code_update();
2971
d8ef542d
MD
2972 revents = LTTNG_POLL_GETEV(&events, i);
2973 pollfd = LTTNG_POLL_GETFD(&events, i);
2974
d8ef542d 2975 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 2976 if (revents & LPOLLIN) {
d8ef542d 2977 enum consumer_channel_action action;
a0cbdd2e 2978 uint64_t key;
d8ef542d 2979
a0cbdd2e 2980 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 2981 if (ret <= 0) {
03e43155
MD
2982 if (ret < 0) {
2983 ERR("Error reading channel pipe");
2984 }
28ab034a
JG
2985 lttng_poll_del(&events,
2986 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
2987 continue;
2988 }
2989
2990 switch (action) {
2991 case CONSUMER_CHANNEL_ADD:
56047f5a 2992 {
28ab034a 2993 DBG("Adding channel %d to poll set", chan->wait_fd);
d8ef542d
MD
2994
2995 lttng_ht_node_init_u64(&chan->wait_fd_node,
28ab034a 2996 chan->wait_fd);
56047f5a 2997 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2998 lttng_ht_add_unique_u64(channel_ht,
28ab034a 2999 &chan->wait_fd_node);
d8ef542d 3000 /* Add channel to the global poll events list */
28ab034a
JG
3001 // FIXME: Empty flag on a pipe pollset, this might
3002 // hang on FreeBSD.
1524f98c 3003 lttng_poll_add(&events, chan->wait_fd, 0);
d8ef542d 3004 break;
56047f5a 3005 }
a0cbdd2e
MD
3006 case CONSUMER_CHANNEL_DEL:
3007 {
b4a650f3 3008 /*
28ab034a
JG
3009 * This command should never be called if the
3010 * channel has streams monitored by either the data
3011 * or metadata thread. The consumer only notify this
3012 * thread with a channel del. command if it receives
3013 * a destroy channel command from the session daemon
3014 * that send it if a command prior to the
3015 * GET_CHANNEL failed.
b4a650f3
DG
3016 */
3017
56047f5a 3018 lttng::urcu::read_lock_guard read_lock;
a0cbdd2e
MD
3019 chan = consumer_find_channel(key);
3020 if (!chan) {
28ab034a
JG
3021 ERR("UST consumer get channel key %" PRIu64
3022 " not found for del channel",
3023 key);
a0cbdd2e
MD
3024 break;
3025 }
3026 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 3027 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e 3028 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3029 LTTNG_ASSERT(ret == 0);
a0cbdd2e 3030
fa29bfbf 3031 switch (the_consumer_data.type) {
f2a444f1
DG
3032 case LTTNG_CONSUMER_KERNEL:
3033 break;
3034 case LTTNG_CONSUMER32_UST:
3035 case LTTNG_CONSUMER64_UST:
212d67a2 3036 health_code_update();
28ab034a
JG
3037 /* Destroy streams that might have been left
3038 * in the stream list. */
212d67a2 3039 clean_channel_stream_list(chan);
f2a444f1
DG
3040 break;
3041 default:
3042 ERR("Unknown consumer_data type");
a0377dfe 3043 abort();
f2a444f1
DG
3044 }
3045
a0cbdd2e 3046 /*
28ab034a
JG
3047 * Release our own refcount. Force channel deletion
3048 * even if streams were not initialized.
a0cbdd2e
MD
3049 */
3050 if (!uatomic_sub_return(&chan->refcount, 1)) {
3051 consumer_del_channel(chan);
3052 }
3053 goto restart;
3054 }
d8ef542d
MD
3055 case CONSUMER_CHANNEL_QUIT:
3056 /*
28ab034a
JG
3057 * Remove the pipe from the poll set and continue
3058 * the loop since their might be data to consume.
d8ef542d 3059 */
28ab034a
JG
3060 lttng_poll_del(&events,
3061 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3062 continue;
3063 default:
3064 ERR("Unknown action");
3065 break;
3066 }
03e43155
MD
3067 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3068 DBG("Channel thread pipe hung up");
3069 /*
3070 * Remove the pipe from the poll set and continue the loop
3071 * since their might be data to consume.
3072 */
3073 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3074 continue;
3075 } else {
28ab034a
JG
3076 ERR("Unexpected poll events %u for sock %d",
3077 revents,
3078 pollfd);
03e43155 3079 goto end;
d8ef542d
MD
3080 }
3081
3082 /* Handle other stream */
3083 continue;
3084 }
3085
56047f5a 3086 lttng::urcu::read_lock_guard read_lock;
d8ef542d
MD
3087 {
3088 uint64_t tmp_id = (uint64_t) pollfd;
3089
3090 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3091 }
3092 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 3093 LTTNG_ASSERT(node);
d8ef542d 3094
28ab034a 3095 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
d8ef542d
MD
3096
3097 /* Check for error event */
3098 if (revents & (LPOLLERR | LPOLLHUP)) {
3099 DBG("Channel fd %d is hup|err.", pollfd);
3100
3101 lttng_poll_del(&events, chan->wait_fd);
3102 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3103 LTTNG_ASSERT(ret == 0);
b4a650f3
DG
3104
3105 /*
3106 * This will close the wait fd for each stream associated to
3107 * this channel AND monitored by the data/metadata thread thus
3108 * will be clean by the right thread.
3109 */
d8ef542d 3110 consumer_close_channel_streams(chan);
f2ad556d
MD
3111
3112 /* Release our own refcount */
28ab034a
JG
3113 if (!uatomic_sub_return(&chan->refcount, 1) &&
3114 !uatomic_read(&chan->nb_init_stream_left)) {
f2ad556d
MD
3115 consumer_del_channel(chan);
3116 }
03e43155
MD
3117 } else {
3118 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 3119 goto end;
d8ef542d
MD
3120 }
3121
3122 /* Release RCU lock for the channel looked up */
d8ef542d
MD
3123 }
3124 }
3125
1fc79fb4
MD
3126 /* All is OK */
3127 err = 0;
d8ef542d
MD
3128end:
3129 lttng_poll_clean(&events);
3130end_poll:
3131 destroy_channel_ht(channel_ht);
3132end_ht:
2d57de81 3133error_testpoint:
d8ef542d 3134 DBG("Channel poll thread exiting");
1fc79fb4
MD
3135 if (err) {
3136 health_error();
3137 ERR("Health error occurred in %s", __func__);
3138 }
3139 health_unregister(health_consumerd);
d8ef542d 3140 rcu_unregister_thread();
cd9adb8b 3141 return nullptr;
d8ef542d
MD
3142}
3143
331744e3 3144static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3145 struct pollfd *sockpoll,
3146 int client_socket)
331744e3
JD
3147{
3148 int ret;
3149
a0377dfe
FD
3150 LTTNG_ASSERT(ctx);
3151 LTTNG_ASSERT(sockpoll);
331744e3 3152
84382d49
MD
3153 ret = lttng_consumer_poll_socket(sockpoll);
3154 if (ret) {
331744e3
JD
3155 goto error;
3156 }
3157 DBG("Metadata connection on client_socket");
3158
3159 /* Blocking call, waiting for transmission */
3160 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3161 if (ctx->consumer_metadata_socket < 0) {
3162 WARN("On accept metadata");
3163 ret = -1;
3164 goto error;
3165 }
3166 ret = 0;
3167
3168error:
3169 return ret;
3170}
3171
3bd1e081
MD
3172/*
3173 * This thread listens on the consumerd socket and receives the file
3174 * descriptors from the session daemon.
3175 */
7d980def 3176void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3177{
1fc79fb4 3178 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3179 /*
3180 * structure to poll for incoming data on communication socket avoids
3181 * making blocking sockets.
3182 */
3183 struct pollfd consumer_sockpoll[2];
97535efa 3184 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3bd1e081 3185
e7b994a3
DG
3186 rcu_register_thread();
3187
1fc79fb4
MD
3188 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3189
2d57de81
MD
3190 if (testpoint(consumerd_thread_sessiond)) {
3191 goto error_testpoint;
3192 }
3193
9ce5646a
MD
3194 health_code_update();
3195
3bd1e081
MD
3196 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3197 unlink(ctx->consumer_command_sock_path);
3198 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3199 if (client_socket < 0) {
3200 ERR("Cannot create command socket");
3201 goto end;
3202 }
3203
3204 ret = lttcomm_listen_unix_sock(client_socket);
3205 if (ret < 0) {
3206 goto end;
3207 }
3208
32258573 3209 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3210 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3211 /* return < 0 on error, but == 0 is not fatal */
3212 if (ret < 0) {
32258573 3213 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3214 goto end;
3215 }
3216
3bd1e081
MD
3217 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3218 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3219 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3220 consumer_sockpoll[1].fd = client_socket;
3221 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3222
84382d49
MD
3223 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3224 if (ret) {
3225 if (ret > 0) {
3226 /* should exit */
3227 err = 0;
3228 }
3bd1e081
MD
3229 goto end;
3230 }
3231 DBG("Connection on client_socket");
3232
3233 /* Blocking call, waiting for transmission */
3234 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3235 if (sock < 0) {
3bd1e081
MD
3236 WARN("On accept");
3237 goto end;
3238 }
3bd1e081 3239
331744e3
JD
3240 /*
3241 * Setup metadata socket which is the second socket connection on the
3242 * command unix socket.
3243 */
3244 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3245 if (ret) {
3246 if (ret > 0) {
3247 /* should exit */
3248 err = 0;
3249 }
331744e3
JD
3250 goto end;
3251 }
3252
d96f09c6
DG
3253 /* This socket is not useful anymore. */
3254 ret = close(client_socket);
3255 if (ret < 0) {
3256 PERROR("close client_socket");
3257 }
3258 client_socket = -1;
3259
3bd1e081
MD
3260 /* update the polling structure to poll on the established socket */
3261 consumer_sockpoll[1].fd = sock;
3262 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3263
cd9adb8b 3264 while (true) {
9ce5646a
MD
3265 health_code_update();
3266
3267 health_poll_entry();
3268 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3269 health_poll_exit();
84382d49
MD
3270 if (ret) {
3271 if (ret > 0) {
3272 /* should exit */
3273 err = 0;
3274 }
3bd1e081
MD
3275 goto end;
3276 }
3277 DBG("Incoming command on sock");
3278 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3279 if (ret <= 0) {
3280 /*
3281 * This could simply be a session daemon quitting. Don't output
3282 * ERR() here.
3283 */
3284 DBG("Communication interrupted on command socket");
41ba6035 3285 err = 0;
3bd1e081
MD
3286 goto end;
3287 }
10211f5c 3288 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3289 DBG("consumer_thread_receive_fds received quit from signal");
28ab034a 3290 err = 0; /* All is OK */
3bd1e081
MD
3291 goto end;
3292 }
a1ed855a 3293 DBG("Received command on sock");
3bd1e081 3294 }
1fc79fb4
MD
3295 /* All is OK */
3296 err = 0;
3297
3bd1e081 3298end:
ffe60014 3299 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3300
d88aee68
DG
3301 /*
3302 * Close metadata streams since the producer is the session daemon which
3303 * just died.
3304 *
3305 * NOTE: for now, this only applies to the UST tracer.
3306 */
6d574024 3307 lttng_consumer_close_all_metadata();
d88aee68 3308
3bd1e081
MD
3309 /*
3310 * when all fds have hung up, the polling thread
3311 * can exit cleanly
3312 */
10211f5c 3313 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3314
04fdd819 3315 /*
c869f647 3316 * Notify the data poll thread to poll back again and test the
8994307f 3317 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3318 */
acdb9057 3319 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3320
cd9adb8b 3321 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3322
5c635c72
MD
3323 notify_health_quit_pipe(health_quit_pipe);
3324
d96f09c6
DG
3325 /* Cleaning up possibly open sockets. */
3326 if (sock >= 0) {
3327 ret = close(sock);
3328 if (ret < 0) {
3329 PERROR("close sock sessiond poll");
3330 }
3331 }
3332 if (client_socket >= 0) {
38476d24 3333 ret = close(client_socket);
d96f09c6
DG
3334 if (ret < 0) {
3335 PERROR("close client_socket sessiond poll");
3336 }
3337 }
3338
2d57de81 3339error_testpoint:
1fc79fb4
MD
3340 if (err) {
3341 health_error();
3342 ERR("Health error occurred in %s", __func__);
3343 }
3344 health_unregister(health_consumerd);
3345
e7b994a3 3346 rcu_unregister_thread();
cd9adb8b 3347 return nullptr;
3bd1e081 3348}
d41f73b7 3349
503fefca 3350static int post_consume(struct lttng_consumer_stream *stream,
28ab034a
JG
3351 const struct stream_subbuffer *subbuffer,
3352 struct lttng_consumer_local_data *ctx)
f96af312 3353{
503fefca 3354 size_t i;
f96af312 3355 int ret = 0;
28ab034a
JG
3356 const size_t count =
3357 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
f96af312 3358
503fefca
JG
3359 for (i = 0; i < count; i++) {
3360 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
28ab034a 3361 &stream->read_subbuffer_ops.post_consume_cbs, i);
503fefca
JG
3362
3363 ret = op(stream, subbuffer, ctx);
3364 if (ret) {
3365 goto end;
f96af312 3366 }
f96af312 3367 }
f96af312
JG
3368end:
3369 return ret;
3370}
3371
4078b776 3372ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
28ab034a
JG
3373 struct lttng_consumer_local_data *ctx,
3374 bool locked_by_caller)
d41f73b7 3375{
12bddd1d 3376 ssize_t ret, written_bytes = 0;
23d56598 3377 int rotation_ret;
6f9449c2 3378 struct stream_subbuffer subbuffer = {};
b6797c8e 3379 enum get_next_subbuffer_status get_next_status;
74251bb8 3380
6f9449c2
JG
3381 if (!locked_by_caller) {
3382 stream->read_subbuffer_ops.lock(stream);
947bd097
JR
3383 } else {
3384 stream->read_subbuffer_ops.assert_locked(stream);
6f9449c2
JG
3385 }
3386
3387 if (stream->read_subbuffer_ops.on_wake_up) {
3388 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3389 if (ret) {
3390 goto end;
3391 }
94d49140 3392 }
74251bb8 3393
23d56598
JG
3394 /*
3395 * If the stream was flagged to be ready for rotation before we extract
3396 * the next packet, rotate it now.
3397 */
3398 if (stream->rotate_ready) {
3399 DBG("Rotate stream before consuming data");
f46376a1 3400 ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3401 if (ret < 0) {
3402 ERR("Stream rotation error before consuming data");
3403 goto end;
3404 }
3405 }
3406
28ab034a 3407 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
b6797c8e
JG
3408 switch (get_next_status) {
3409 case GET_NEXT_SUBBUFFER_STATUS_OK:
3410 break;
3411 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3412 /* Not an error. */
3413 ret = 0;
3414 goto sleep_stream;
3415 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3416 ret = -1;
6f9449c2 3417 goto end;
b6797c8e
JG
3418 default:
3419 abort();
d41f73b7 3420 }
74251bb8 3421
28ab034a 3422 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
6f9449c2
JG
3423 if (ret) {
3424 goto error_put_subbuf;
3425 }
3426
28ab034a 3427 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
514775d9
FD
3428 if (written_bytes <= 0) {
3429 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3430 ret = (int) written_bytes;
3431 goto error_put_subbuf;
6f9449c2
JG
3432 }
3433
3434 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3435 if (ret) {
23d56598
JG
3436 goto end;
3437 }
3438
503fefca
JG
3439 ret = post_consume(stream, &subbuffer, ctx);
3440 if (ret) {
3441 goto end;
6f9449c2
JG
3442 }
3443
23d56598
JG
3444 /*
3445 * After extracting the packet, we check if the stream is now ready to
3446 * be rotated and perform the action immediately.
3447 *
3448 * Don't overwrite `ret` as callers expect the number of bytes
3449 * consumed to be returned on success.
3450 */
3451 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3452 if (rotation_ret == 1) {
f46376a1 3453 rotation_ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3454 if (rotation_ret < 0) {
3455 ret = rotation_ret;
3456 ERR("Stream rotation error after consuming data");
3457 goto end;
3458 }
503fefca 3459
23d56598
JG
3460 } else if (rotation_ret < 0) {
3461 ret = rotation_ret;
3462 ERR("Failed to check if stream was ready to rotate after consuming data");
3463 goto end;
3464 }
3465
82e72193 3466sleep_stream:
6f9449c2
JG
3467 if (stream->read_subbuffer_ops.on_sleep) {
3468 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3469 }
3470
3471 ret = written_bytes;
23d56598 3472end:
6f9449c2
JG
3473 if (!locked_by_caller) {
3474 stream->read_subbuffer_ops.unlock(stream);
94d49140 3475 }
6f9449c2 3476
74251bb8 3477 return ret;
6f9449c2
JG
3478error_put_subbuf:
3479 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3480 goto end;
d41f73b7
MD
3481}
3482
3483int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3484{
fa29bfbf 3485 switch (the_consumer_data.type) {
d41f73b7
MD
3486 case LTTNG_CONSUMER_KERNEL:
3487 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3488 case LTTNG_CONSUMER32_UST:
3489 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3490 return lttng_ustconsumer_on_recv_stream(stream);
3491 default:
3492 ERR("Unknown consumer_data type");
a0377dfe 3493 abort();
d41f73b7
MD
3494 return -ENOSYS;
3495 }
3496}
e4421fec
DG
3497
3498/*
3499 * Allocate and set consumer data hash tables.
3500 */
cd9adb8b 3501int lttng_consumer_init()
e4421fec 3502{
fa29bfbf
SM
3503 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3504 if (!the_consumer_data.channel_ht) {
282dadbc
MD
3505 goto error;
3506 }
3507
28ab034a 3508 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3509 if (!the_consumer_data.channels_by_session_id_ht) {
5c3892a6
JG
3510 goto error;
3511 }
3512
fa29bfbf
SM
3513 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3514 if (!the_consumer_data.relayd_ht) {
282dadbc
MD
3515 goto error;
3516 }
3517
fa29bfbf
SM
3518 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3519 if (!the_consumer_data.stream_list_ht) {
282dadbc
MD
3520 goto error;
3521 }
3522
28ab034a 3523 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3524 if (!the_consumer_data.stream_per_chan_id_ht) {
282dadbc
MD
3525 goto error;
3526 }
3527
3528 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3529 if (!data_ht) {
3530 goto error;
3531 }
3532
3533 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3534 if (!metadata_ht) {
3535 goto error;
3536 }
3537
fa29bfbf
SM
3538 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3539 if (!the_consumer_data.chunk_registry) {
28cc88f3
JG
3540 goto error;
3541 }
3542
282dadbc
MD
3543 return 0;
3544
3545error:
3546 return -1;
e4421fec 3547}
7735ef9e
DG
3548
3549/*
3550 * Process the ADD_RELAYD command receive by a consumer.
3551 *
3552 * This will create a relayd socket pair and add it to the relayd hash table.
3553 * The caller MUST acquire a RCU read side lock before calling it.
3554 */
4222116f 3555void consumer_add_relayd_socket(uint64_t net_seq_idx,
28ab034a
JG
3556 int sock_type,
3557 struct lttng_consumer_local_data *ctx,
3558 int sock,
3559 struct pollfd *consumer_sockpoll,
3560 uint64_t sessiond_id,
3561 uint64_t relayd_session_id,
3562 uint32_t relayd_version_major,
3563 uint32_t relayd_version_minor,
3564 enum lttcomm_sock_proto relayd_socket_protocol)
7735ef9e 3565{
cd2b09ed 3566 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3567 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 3568 struct consumer_relayd_sock_pair *relayd = nullptr;
7735ef9e 3569
a0377dfe 3570 LTTNG_ASSERT(ctx);
4222116f 3571 LTTNG_ASSERT(sock >= 0);
48b7cdc2 3572 ASSERT_RCU_READ_LOCKED();
6151a90f 3573
da009f2c 3574 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3575
3576 /* Get relayd reference if exists. */
3577 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 3578 if (relayd == nullptr) {
a0377dfe 3579 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3580 /* Not found. Allocate one. */
3581 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
cd9adb8b 3582 if (relayd == nullptr) {
618a6a28
MD
3583 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3584 goto error;
0d08d75e 3585 } else {
30319bcb 3586 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3587 relayd_created = 1;
7735ef9e 3588 }
0d08d75e
DG
3589
3590 /*
3591 * This code path MUST continue to the consumer send status message to
3592 * we can notify the session daemon and continue our work without
3593 * killing everything.
3594 */
da009f2c
MD
3595 } else {
3596 /*
3597 * relayd key should never be found for control socket.
3598 */
a0377dfe 3599 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3600 }
3601
3602 /* First send a status message before receiving the fds. */
0c759fc9 3603 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3604 if (ret < 0) {
0d08d75e 3605 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3606 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3607 goto error_nosignal;
7735ef9e
DG
3608 }
3609
3610 /* Poll on consumer socket. */
84382d49
MD
3611 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3612 if (ret) {
3613 /* Needing to exit in the middle of a command: error. */
0d08d75e 3614 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3615 goto error_nosignal;
7735ef9e
DG
3616 }
3617
3618 /* Get relayd socket from session daemon */
3619 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3620 if (ret != sizeof(fd)) {
28ab034a 3621 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3622
3623 /*
3624 * Failing to receive FDs might indicate a major problem such as
3625 * reaching a fd limit during the receive where the kernel returns a
3626 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3627 * don't take any chances and stop everything.
3628 *
3629 * XXX: Feature request #558 will fix that and avoid this possible
3630 * issue when reaching the fd limit.
3631 */
3632 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3633 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3634 goto error;
3635 }
3636
7735ef9e
DG
3637 /* Copy socket information and received FD */
3638 switch (sock_type) {
3639 case LTTNG_STREAM_CONTROL:
3640 /* Copy received lttcomm socket */
4222116f 3641 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3642 &relayd->control_sock.sock, fd, relayd_socket_protocol);
7735ef9e 3643
6151a90f 3644 /* Assign version values. */
4222116f
JR
3645 relayd->control_sock.major = relayd_version_major;
3646 relayd->control_sock.minor = relayd_version_minor;
c5b6f4f0 3647
d3e2ba59 3648 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3649
7735ef9e
DG
3650 break;
3651 case LTTNG_STREAM_DATA:
3652 /* Copy received lttcomm socket */
4222116f 3653 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3654 &relayd->data_sock.sock, fd, relayd_socket_protocol);
6151a90f 3655 /* Assign version values. */
4222116f
JR
3656 relayd->data_sock.major = relayd_version_major;
3657 relayd->data_sock.minor = relayd_version_minor;
7735ef9e
DG
3658 break;
3659 default:
3660 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3661 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3662 goto error;
3663 }
3664
4222116f
JR
3665 if (ret < 0) {
3666 ret_code = LTTCOMM_CONSUMERD_FATAL;
3667 goto error;
3668 }
3669
d88aee68 3670 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
28ab034a
JG
3671 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3672 relayd->net_seq_idx,
3673 fd);
39d9954c
FD
3674 /*
3675 * We gave the ownership of the fd to the relayd structure. Set the
3676 * fd to -1 so we don't call close() on it in the error path below.
3677 */
3678 fd = -1;
7735ef9e 3679
618a6a28
MD
3680 /* We successfully added the socket. Send status back. */
3681 ret = consumer_send_status_msg(sock, ret_code);
3682 if (ret < 0) {
3683 /* Somehow, the session daemon is not responding anymore. */
3684 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3685 goto error_nosignal;
3686 }
3687
7735ef9e
DG
3688 /*
3689 * Add relayd socket pair to consumer data hashtable. If object already
3690 * exists or on error, the function gracefully returns.
3691 */
9276e5c8 3692 relayd->ctx = ctx;
d09e1200 3693 add_relayd(relayd);
7735ef9e
DG
3694
3695 /* All good! */
2527bf85 3696 return;
7735ef9e
DG
3697
3698error:
618a6a28
MD
3699 if (consumer_send_status_msg(sock, ret_code) < 0) {
3700 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3701 }
3702
3703error_nosignal:
4028eeb9
DG
3704 /* Close received socket if valid. */
3705 if (fd >= 0) {
3706 if (close(fd)) {
3707 PERROR("close received socket");
3708 }
3709 }
cd2b09ed
DG
3710
3711 if (relayd_created) {
cd2b09ed
DG
3712 free(relayd);
3713 }
7735ef9e 3714}
ca22feea 3715
f7079f67
DG
3716/*
3717 * Search for a relayd associated to the session id and return the reference.
3718 *
3719 * A rcu read side lock MUST be acquire before calling this function and locked
3720 * until the relayd object is no longer necessary.
3721 */
3722static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3723{
3724 struct lttng_ht_iter iter;
cd9adb8b 3725 struct consumer_relayd_sock_pair *relayd = nullptr;
f7079f67 3726
48b7cdc2
FD
3727 ASSERT_RCU_READ_LOCKED();
3728
f7079f67 3729 /* Iterate over all relayd since they are indexed by net_seq_idx. */
28ab034a 3730 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
18261bd1
DG
3731 /*
3732 * Check by sessiond id which is unique here where the relayd session
3733 * id might not be when having multiple relayd.
3734 */
3735 if (relayd->sessiond_session_id == id) {
f7079f67 3736 /* Found the relayd. There can be only one per id. */
18261bd1 3737 goto found;
f7079f67
DG
3738 }
3739 }
3740
cd9adb8b 3741 return nullptr;
18261bd1
DG
3742
3743found:
f7079f67
DG
3744 return relayd;
3745}
3746
ca22feea
DG
3747/*
3748 * Check if for a given session id there is still data needed to be extract
3749 * from the buffers.
3750 *
6d805429 3751 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3752 */
6d805429 3753int consumer_data_pending(uint64_t id)
ca22feea
DG
3754{
3755 int ret;
3756 struct lttng_ht_iter iter;
3757 struct lttng_ht *ht;
3758 struct lttng_consumer_stream *stream;
cd9adb8b 3759 struct consumer_relayd_sock_pair *relayd = nullptr;
6d805429 3760 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3761
6d805429 3762 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3763
56047f5a 3764 lttng::urcu::read_lock_guard read_lock;
fa29bfbf 3765 pthread_mutex_lock(&the_consumer_data.lock);
ca22feea 3766
fa29bfbf 3767 switch (the_consumer_data.type) {
ca22feea 3768 case LTTNG_CONSUMER_KERNEL:
6d805429 3769 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3770 break;
3771 case LTTNG_CONSUMER32_UST:
3772 case LTTNG_CONSUMER64_UST:
6d805429 3773 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3774 break;
3775 default:
3776 ERR("Unknown consumer data type");
a0377dfe 3777 abort();
ca22feea
DG
3778 }
3779
3780 /* Ease our life a bit */
fa29bfbf 3781 ht = the_consumer_data.stream_list_ht;
ca22feea 3782
c8f59ee5 3783 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3784 ht->hash_fct(&id, lttng_ht_seed),
3785 ht->match_fct,
3786 &id,
3787 &iter.iter,
3788 stream,
3789 node_session_id.node)
3790 {
bb586a6e 3791 pthread_mutex_lock(&stream->lock);
ca22feea 3792
4e9a4686
DG
3793 /*
3794 * A removed node from the hash table indicates that the stream has
3795 * been deleted thus having a guarantee that the buffers are closed
3796 * on the consumer side. However, data can still be transmitted
3797 * over the network so don't skip the relayd check.
3798 */
3799 ret = cds_lfht_is_node_deleted(&stream->node.node);
3800 if (!ret) {
3801 /* Check the stream if there is data in the buffers. */
6d805429
DG
3802 ret = data_pending(stream);
3803 if (ret == 1) {
4e9a4686 3804 pthread_mutex_unlock(&stream->lock);
f7079f67 3805 goto data_pending;
4e9a4686
DG
3806 }
3807 }
3808
d9f0c7c7
JR
3809 pthread_mutex_unlock(&stream->lock);
3810 }
3811
3812 relayd = find_relayd_by_session_id(id);
3813 if (relayd) {
3814 unsigned int is_data_inflight = 0;
3815
3816 /* Send init command for data pending. */
3817 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 3818 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
d9f0c7c7
JR
3819 if (ret < 0) {
3820 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3821 /* Communication error thus the relayd so no data pending. */
3822 goto data_not_pending;
3823 }
3824
3825 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3826 ht->hash_fct(&id, lttng_ht_seed),
3827 ht->match_fct,
3828 &id,
3829 &iter.iter,
3830 stream,
3831 node_session_id.node)
3832 {
c8f59ee5 3833 if (stream->metadata_flag) {
ad7051c0 3834 ret = relayd_quiescent_control(&relayd->control_sock,
28ab034a 3835 stream->relayd_stream_id);
c8f59ee5 3836 } else {
6d805429 3837 ret = relayd_data_pending(&relayd->control_sock,
28ab034a
JG
3838 stream->relayd_stream_id,
3839 stream->next_net_seq_num - 1);
c8f59ee5 3840 }
d9f0c7c7
JR
3841
3842 if (ret == 1) {
3843 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3844 goto data_pending;
3845 } else if (ret < 0) {
28ab034a
JG
3846 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3847 relayd->net_seq_idx);
9276e5c8
JR
3848 lttng_consumer_cleanup_relayd(relayd);
3849 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
9276e5c8
JR
3850 goto data_not_pending;
3851 }
c8f59ee5 3852 }
f7079f67 3853
d9f0c7c7 3854 /* Send end command for data pending. */
28ab034a
JG
3855 ret = relayd_end_data_pending(
3856 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
f7079f67 3857 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3858 if (ret < 0) {
28ab034a
JG
3859 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3860 relayd->net_seq_idx);
9276e5c8 3861 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3862 goto data_not_pending;
3863 }
bdd88757
DG
3864 if (is_data_inflight) {
3865 goto data_pending;
3866 }
f7079f67
DG
3867 }
3868
ca22feea 3869 /*
f7079f67
DG
3870 * Finding _no_ node in the hash table and no inflight data means that the
3871 * stream(s) have been removed thus data is guaranteed to be available for
3872 * analysis from the trace files.
ca22feea
DG
3873 */
3874
f7079f67 3875data_not_pending:
ca22feea 3876 /* Data is available to be read by a viewer. */
fa29bfbf 3877 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3878 return 0;
ca22feea 3879
f7079f67 3880data_pending:
ca22feea 3881 /* Data is still being extracted from buffers. */
fa29bfbf 3882 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3883 return 1;
ca22feea 3884}
f50f23d9
DG
3885
3886/*
3887 * Send a ret code status message to the sessiond daemon.
3888 *
3889 * Return the sendmsg() return value.
3890 */
3891int consumer_send_status_msg(int sock, int ret_code)
3892{
3893 struct lttcomm_consumer_status_msg msg;
3894
53efb85a 3895 memset(&msg, 0, sizeof(msg));
97535efa 3896 msg.ret_code = (lttcomm_return_code) ret_code;
f50f23d9
DG
3897
3898 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3899}
ffe60014
DG
3900
3901/*
3902 * Send a channel status message to the sessiond daemon.
3903 *
3904 * Return the sendmsg() return value.
3905 */
28ab034a 3906int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
ffe60014
DG
3907{
3908 struct lttcomm_consumer_status_channel msg;
3909
a0377dfe 3910 LTTNG_ASSERT(sock >= 0);
ffe60014 3911
53efb85a 3912 memset(&msg, 0, sizeof(msg));
ffe60014 3913 if (!channel) {
0c759fc9 3914 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3915 } else {
0c759fc9 3916 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3917 msg.key = channel->key;
3918 msg.stream_count = channel->streams.count;
3919 }
3920
3921 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3922}
5c786ded 3923
d07ceecd 3924unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
28ab034a
JG
3925 unsigned long produced_pos,
3926 uint64_t nb_packets_per_stream,
3927 uint64_t max_sb_size)
5c786ded 3928{
d07ceecd 3929 unsigned long start_pos;
5c786ded 3930
d07ceecd 3931 if (!nb_packets_per_stream) {
28ab034a 3932 return consumed_pos; /* Grab everything */
d07ceecd 3933 }
1cbd136b 3934 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
d07ceecd
MD
3935 start_pos -= max_sb_size * nb_packets_per_stream;
3936 if ((long) (start_pos - consumed_pos) < 0) {
28ab034a 3937 return consumed_pos; /* Grab everything */
d07ceecd
MD
3938 }
3939 return start_pos;
5c786ded 3940}
a1ae2ea5 3941
c1dcb8bb
JG
3942/* Stream lock must be held by the caller. */
3943static int sample_stream_positions(struct lttng_consumer_stream *stream,
28ab034a
JG
3944 unsigned long *produced,
3945 unsigned long *consumed)
c1dcb8bb
JG
3946{
3947 int ret;
3948
3949 ASSERT_LOCKED(stream->lock);
3950
3951 ret = lttng_consumer_sample_snapshot_positions(stream);
3952 if (ret < 0) {
3953 ERR("Failed to sample snapshot positions");
3954 goto end;
3955 }
3956
3957 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3958 if (ret < 0) {
3959 ERR("Failed to sample produced position");
3960 goto end;
3961 }
3962
3963 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3964 if (ret < 0) {
3965 ERR("Failed to sample consumed position");
3966 goto end;
3967 }
3968
3969end:
3970 return ret;
3971}
3972
b99a8d42
JD
3973/*
3974 * Sample the rotate position for all the streams of a channel. If a stream
3975 * is already at the rotate position (produced == consumed), we flag it as
3976 * ready for rotation. The rotation of ready streams occurs after we have
3977 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3978 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3979 *
3980 * Returns 0 on success, < 0 on error
3981 */
92b7a7f8 3982int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
3983 uint64_t key,
3984 uint64_t relayd_id)
b99a8d42
JD
3985{
3986 int ret;
b99a8d42
JD
3987 struct lttng_consumer_stream *stream;
3988 struct lttng_ht_iter iter;
fa29bfbf 3989 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
c35f9726
JG
3990 struct lttng_dynamic_array stream_rotation_positions;
3991 uint64_t next_chunk_id, stream_count = 0;
3992 enum lttng_trace_chunk_status chunk_status;
3993 const bool is_local_trace = relayd_id == -1ULL;
cd9adb8b 3994 struct consumer_relayd_sock_pair *relayd = nullptr;
c35f9726 3995 bool rotating_to_new_chunk = true;
b32703d6
JG
3996 /* Array of `struct lttng_consumer_stream *` */
3997 struct lttng_dynamic_pointer_array streams_packet_to_open;
3998 size_t stream_idx;
b99a8d42 3999
48b7cdc2
FD
4000 ASSERT_RCU_READ_LOCKED();
4001
b99a8d42
JD
4002 DBG("Consumer sample rotate position for channel %" PRIu64, key);
4003
cd9adb8b
JG
4004 lttng_dynamic_array_init(&stream_rotation_positions,
4005 sizeof(struct relayd_stream_rotation_position),
4006 nullptr);
4007 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
c35f9726 4008
56047f5a 4009 lttng::urcu::read_lock_guard read_lock;
b99a8d42 4010
b99a8d42 4011 pthread_mutex_lock(&channel->lock);
a0377dfe 4012 LTTNG_ASSERT(channel->trace_chunk);
28ab034a 4013 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
c35f9726
JG
4014 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4015 ret = -1;
4016 goto end_unlock_channel;
4017 }
b99a8d42
JD
4018
4019 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4020 ht->hash_fct(&channel->key, lttng_ht_seed),
4021 ht->match_fct,
4022 &channel->key,
4023 &iter.iter,
4024 stream,
4025 node_channel_id.node)
4026 {
a40a503f 4027 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
4028
4029 health_code_update();
4030
4031 /*
4032 * Lock stream because we are about to change its state.
4033 */
4034 pthread_mutex_lock(&stream->lock);
4035
c35f9726
JG
4036 if (stream->trace_chunk == stream->chan->trace_chunk) {
4037 rotating_to_new_chunk = false;
4038 }
4039
a40a503f 4040 /*
c1dcb8bb 4041 * Do not flush a packet when rotating from a NULL trace
a9dde553 4042 * chunk. The stream has no means to output data, and the prior
c1dcb8bb
JG
4043 * rotation which rotated to NULL performed that side-effect
4044 * already. No new data can be produced when a stream has no
4045 * associated trace chunk (e.g. a stop followed by a rotate).
a40a503f 4046 */
a9dde553 4047 if (stream->trace_chunk) {
c1dcb8bb
JG
4048 bool flush_active;
4049
4050 if (stream->metadata_flag) {
4051 /*
4052 * Don't produce an empty metadata packet,
4053 * simply close the current one.
4054 *
4055 * Metadata is regenerated on every trace chunk
4056 * switch; there is no concern that no data was
4057 * produced.
4058 */
4059 flush_active = true;
4060 } else {
4061 /*
4062 * Only flush an empty packet if the "packet
4063 * open" could not be performed on transition
4064 * to a new trace chunk and no packets were
4065 * consumed within the chunk's lifetime.
4066 */
4067 if (stream->opened_packet_in_current_trace_chunk) {
4068 flush_active = true;
4069 } else {
4070 /*
4071 * Stream could have been full at the
4072 * time of rotation, but then have had
4073 * no activity at all.
4074 *
4075 * It is important to flush a packet
4076 * to prevent 0-length files from being
4077 * produced as most viewers choke on
4078 * them.
4079 *
4080 * Unfortunately viewers will not be
4081 * able to know that tracing was active
4082 * for this stream during this trace
4083 * chunk's lifetime.
4084 */
28ab034a
JG
4085 ret = sample_stream_positions(
4086 stream, &produced_pos, &consumed_pos);
c1dcb8bb
JG
4087 if (ret) {
4088 goto end_unlock_stream;
4089 }
4090
4091 /*
4092 * Don't flush an empty packet if data
4093 * was produced; it will be consumed
4094 * before the rotation completes.
4095 */
4096 flush_active = produced_pos != consumed_pos;
4097 if (!flush_active) {
c1dcb8bb
JG
4098 const char *trace_chunk_name;
4099 uint64_t trace_chunk_id;
4100
4101 chunk_status = lttng_trace_chunk_get_name(
28ab034a
JG
4102 stream->trace_chunk,
4103 &trace_chunk_name,
cd9adb8b 4104 nullptr);
c1dcb8bb
JG
4105 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4106 trace_chunk_name = "none";
4107 }
4108
4109 /*
4110 * Consumer trace chunks are
4111 * never anonymous.
4112 */
4113 chunk_status = lttng_trace_chunk_get_id(
28ab034a 4114 stream->trace_chunk, &trace_chunk_id);
a0377dfe 4115 LTTNG_ASSERT(chunk_status ==
28ab034a 4116 LTTNG_TRACE_CHUNK_STATUS_OK);
c1dcb8bb
JG
4117
4118 DBG("Unable to open packet for stream during trace chunk's lifetime. "
28ab034a
JG
4119 "Flushing an empty packet to prevent an empty file from being created: "
4120 "stream id = %" PRIu64
4121 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4122 stream->key,
4123 trace_chunk_name,
4124 trace_chunk_id);
c1dcb8bb
JG
4125 }
4126 }
4127 }
4128
a9dde553 4129 /*
c1dcb8bb
JG
4130 * Close the current packet before sampling the
4131 * ring buffer positions.
a9dde553 4132 */
c1dcb8bb 4133 ret = consumer_stream_flush_buffer(stream, flush_active);
a9dde553
MD
4134 if (ret < 0) {
4135 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
28ab034a 4136 stream->key);
a9dde553
MD
4137 goto end_unlock_stream;
4138 }
b99a8d42
JD
4139 }
4140
a40a503f
MD
4141 ret = lttng_consumer_take_snapshot(stream);
4142 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4143 ERR("Failed to sample snapshot position during channel rotation");
b99a8d42
JD
4144 goto end_unlock_stream;
4145 }
a40a503f 4146 if (!ret) {
28ab034a 4147 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
a40a503f
MD
4148 if (ret < 0) {
4149 ERR("Failed to sample produced position during channel rotation");
4150 goto end_unlock_stream;
4151 }
b99a8d42 4152
28ab034a 4153 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
a40a503f
MD
4154 if (ret < 0) {
4155 ERR("Failed to sample consumed position during channel rotation");
4156 goto end_unlock_stream;
4157 }
4158 }
4159 /*
4160 * Align produced position on the start-of-packet boundary of the first
4161 * packet going into the next trace chunk.
4162 */
1cbd136b 4163 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
a40a503f 4164 if (consumed_pos == produced_pos) {
f8528c7a 4165 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
28ab034a
JG
4166 stream->key,
4167 produced_pos,
4168 consumed_pos);
b99a8d42 4169 stream->rotate_ready = true;
f8528c7a
MD
4170 } else {
4171 DBG("Different consumed and produced positions "
28ab034a
JG
4172 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4173 stream->key,
4174 produced_pos,
4175 consumed_pos);
b99a8d42 4176 }
633d0182 4177 /*
a40a503f
MD
4178 * The rotation position is based on the packet_seq_num of the
4179 * packet following the last packet that was consumed for this
4180 * stream, incremented by the offset between produced and
4181 * consumed positions. This rotation position is a lower bound
4182 * (inclusive) at which the next trace chunk starts. Since it
4183 * is a lower bound, it is OK if the packet_seq_num does not
4184 * correspond exactly to the same packet identified by the
4185 * consumed_pos, which can happen in overwrite mode.
633d0182 4186 */
a40a503f
MD
4187 if (stream->sequence_number_unavailable) {
4188 /*
4189 * Rotation should never be performed on a session which
4190 * interacts with a pre-2.8 lttng-modules, which does
4191 * not implement packet sequence number.
4192 */
4193 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
28ab034a 4194 stream->key);
a40a503f 4195 ret = -1;
b99a8d42
JD
4196 goto end_unlock_stream;
4197 }
a40a503f 4198 stream->rotate_position = stream->last_sequence_number + 1 +
28ab034a 4199 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a 4200 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
28ab034a
JG
4201 stream->key,
4202 stream->rotate_position);
b99a8d42 4203
c35f9726 4204 if (!is_local_trace) {
633d0182
JG
4205 /*
4206 * The relay daemon control protocol expects a rotation
4207 * position as "the sequence number of the first packet
a40a503f 4208 * _after_ the current trace chunk".
633d0182 4209 */
c35f9726
JG
4210 const struct relayd_stream_rotation_position position = {
4211 .stream_id = stream->relayd_stream_id,
a40a503f 4212 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4213 };
4214
28ab034a
JG
4215 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4216 &position);
c35f9726
JG
4217 if (ret) {
4218 ERR("Failed to allocate stream rotation position");
4219 goto end_unlock_stream;
4220 }
4221 stream_count++;
4222 }
f96af312
JG
4223
4224 stream->opened_packet_in_current_trace_chunk = false;
4225
4226 if (rotating_to_new_chunk && !stream->metadata_flag) {
4227 /*
4228 * Attempt to flush an empty packet as close to the
4229 * rotation point as possible. In the event where a
4230 * stream remains inactive after the rotation point,
4231 * this ensures that the new trace chunk has a
4232 * beginning timestamp set at the begining of the
4233 * trace chunk instead of only creating an empty
4234 * packet when the trace chunk is stopped.
4235 *
4236 * This indicates to the viewers that the stream
4237 * was being recorded, but more importantly it
4238 * allows viewers to determine a useable trace
4239 * intersection.
4240 *
4241 * This presents a problem in the case where the
4242 * ring-buffer is completely full.
4243 *
4244 * Consider the following scenario:
4245 * - The consumption of data is slow (slow network,
4246 * for instance),
4247 * - The ring buffer is full,
4248 * - A rotation is initiated,
4249 * - The flush below does nothing (no space left to
4250 * open a new packet),
4251 * - The other streams rotate very soon, and new
4252 * data is produced in the new chunk,
4253 * - This stream completes its rotation long after the
4254 * rotation was initiated
4255 * - The session is stopped before any event can be
4256 * produced in this stream's buffers.
4257 *
4258 * The resulting trace chunk will have a single packet
4259 * temporaly at the end of the trace chunk for this
4260 * stream making the stream intersection more narrow
4261 * than it should be.
4262 *
4263 * To work-around this, an empty flush is performed
4264 * after the first consumption of a packet during a
4265 * rotation if open_packet fails. The idea is that
4266 * consuming a packet frees enough space to switch
4267 * packets in this scenario and allows the tracer to
4268 * "stamp" the beginning of the new trace chunk at the
4269 * earliest possible point.
b32703d6
JG
4270 *
4271 * The packet open is performed after the channel
4272 * rotation to ensure that no attempt to open a packet
4273 * is performed in a stream that has no active trace
4274 * chunk.
f96af312 4275 */
28ab034a
JG
4276 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4277 stream);
b32703d6
JG
4278 if (ret) {
4279 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
f96af312
JG
4280 ret = -1;
4281 goto end_unlock_stream;
f96af312
JG
4282 }
4283 }
4284
b99a8d42
JD
4285 pthread_mutex_unlock(&stream->lock);
4286 }
cd9adb8b 4287 stream = nullptr;
b99a8d42 4288
b32703d6
JG
4289 if (!is_local_trace) {
4290 relayd = consumer_find_relayd(relayd_id);
4291 if (!relayd) {
4292 ERR("Failed to find relayd %" PRIu64, relayd_id);
4293 ret = -1;
4294 goto end_unlock_channel;
4295 }
c35f9726 4296
b32703d6 4297 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
4298 ret = relayd_rotate_streams(&relayd->control_sock,
4299 stream_count,
cd9adb8b 4300 rotating_to_new_chunk ? &next_chunk_id : nullptr,
28ab034a
JG
4301 (const struct relayd_stream_rotation_position *)
4302 stream_rotation_positions.buffer.data);
b32703d6
JG
4303 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4304 if (ret < 0) {
4305 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
28ab034a 4306 relayd->net_seq_idx);
b32703d6
JG
4307 lttng_consumer_cleanup_relayd(relayd);
4308 goto end_unlock_channel;
4309 }
c35f9726
JG
4310 }
4311
b32703d6 4312 for (stream_idx = 0;
28ab034a
JG
4313 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4314 stream_idx++) {
b32703d6
JG
4315 enum consumer_stream_open_packet_status status;
4316
97535efa 4317 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
28ab034a 4318 &streams_packet_to_open, stream_idx);
b32703d6
JG
4319
4320 pthread_mutex_lock(&stream->lock);
4321 status = consumer_stream_open_packet(stream);
4322 pthread_mutex_unlock(&stream->lock);
4323 switch (status) {
4324 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4325 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4326 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4327 stream->key,
4328 stream->chan->name,
4329 stream->chan->session_id);
b32703d6
JG
4330 break;
4331 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4332 /*
4333 * Can't open a packet as there is no space left
4334 * in the buffer. A new packet will be opened
4335 * once one has been consumed.
4336 */
4337 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4338 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4339 stream->key,
4340 stream->chan->name,
4341 stream->chan->session_id);
b32703d6
JG
4342 break;
4343 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4344 /* Logged by callee. */
4345 ret = -1;
7a86c13d 4346 goto end_unlock_channel;
b32703d6
JG
4347 default:
4348 abort();
4349 }
c35f9726
JG
4350 }
4351
b32703d6 4352 pthread_mutex_unlock(&channel->lock);
b99a8d42
JD
4353 ret = 0;
4354 goto end;
4355
4356end_unlock_stream:
4357 pthread_mutex_unlock(&stream->lock);
c35f9726 4358end_unlock_channel:
b99a8d42
JD
4359 pthread_mutex_unlock(&channel->lock);
4360end:
c35f9726 4361 lttng_dynamic_array_reset(&stream_rotation_positions);
b32703d6 4362 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
b99a8d42
JD
4363 return ret;
4364}
4365
28ab034a 4366static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4367{
4368 int ret = 0;
4369 unsigned long consumed_pos_before, consumed_pos_after;
4370
4371 ret = lttng_consumer_sample_snapshot_positions(stream);
4372 if (ret < 0) {
4373 ERR("Taking snapshot positions");
4374 goto end;
4375 }
4376
4377 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4378 if (ret < 0) {
4379 ERR("Consumed snapshot position");
4380 goto end;
4381 }
4382
fa29bfbf 4383 switch (the_consumer_data.type) {
5f3aff8b
MD
4384 case LTTNG_CONSUMER_KERNEL:
4385 ret = kernctl_buffer_clear(stream->wait_fd);
4386 if (ret < 0) {
96393977 4387 ERR("Failed to clear kernel stream (ret = %d)", ret);
5f3aff8b
MD
4388 goto end;
4389 }
4390 break;
4391 case LTTNG_CONSUMER32_UST:
4392 case LTTNG_CONSUMER64_UST:
881fc67f
MD
4393 ret = lttng_ustconsumer_clear_buffer(stream);
4394 if (ret < 0) {
4395 ERR("Failed to clear ust stream (ret = %d)", ret);
4396 goto end;
4397 }
5f3aff8b
MD
4398 break;
4399 default:
4400 ERR("Unknown consumer_data type");
4401 abort();
4402 }
4403
4404 ret = lttng_consumer_sample_snapshot_positions(stream);
4405 if (ret < 0) {
4406 ERR("Taking snapshot positions");
4407 goto end;
4408 }
4409 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4410 if (ret < 0) {
4411 ERR("Consumed snapshot position");
4412 goto end;
4413 }
4414 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4415end:
4416 return ret;
4417}
4418
28ab034a 4419static int consumer_clear_stream(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4420{
4421 int ret;
4422
cd9adb8b 4423 ret = consumer_stream_flush_buffer(stream, true);
5f3aff8b 4424 if (ret < 0) {
28ab034a 4425 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4426 ret = LTTCOMM_CONSUMERD_FATAL;
4427 goto error;
4428 }
4429
4430 ret = consumer_clear_buffer(stream);
4431 if (ret < 0) {
28ab034a 4432 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4433 ret = LTTCOMM_CONSUMERD_FATAL;
4434 goto error;
4435 }
4436
4437 ret = LTTCOMM_CONSUMERD_SUCCESS;
4438error:
4439 return ret;
4440}
4441
28ab034a 4442static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
4443{
4444 int ret;
4445 struct lttng_consumer_stream *stream;
4446
56047f5a 4447 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 4448 pthread_mutex_lock(&channel->lock);
28ab034a 4449 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5f3aff8b
MD
4450 health_code_update();
4451 pthread_mutex_lock(&stream->lock);
4452 ret = consumer_clear_stream(stream);
4453 if (ret) {
4454 goto error_unlock;
4455 }
4456 pthread_mutex_unlock(&stream->lock);
4457 }
4458 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4459 return 0;
4460
4461error_unlock:
4462 pthread_mutex_unlock(&stream->lock);
4463 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4464 return ret;
4465}
4466
02d02e31
JD
4467/*
4468 * Check if a stream is ready to be rotated after extracting it.
4469 *
4470 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4471 * error. Stream lock must be held.
4472 */
4473int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4474{
28ab034a
JG
4475 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4476 " last_sequence_number %" PRIu64,
4477 stream->key,
4478 stream->rotate_ready,
4479 stream->rotate_position,
4480 stream->last_sequence_number);
02d02e31 4481 if (stream->rotate_ready) {
a40a503f 4482 return 1;
02d02e31
JD
4483 }
4484
4485 /*
a40a503f
MD
4486 * If packet seq num is unavailable, it means we are interacting
4487 * with a pre-2.8 lttng-modules which does not implement the
4488 * sequence number. Rotation should never be used by sessiond in this
4489 * scenario.
02d02e31 4490 */
a40a503f
MD
4491 if (stream->sequence_number_unavailable) {
4492 ERR("Internal error: rotation used on stream %" PRIu64
28ab034a
JG
4493 " with unavailable sequence number",
4494 stream->key);
a40a503f 4495 return -1;
02d02e31
JD
4496 }
4497
28ab034a 4498 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
a40a503f 4499 return 0;
02d02e31
JD
4500 }
4501
a40a503f
MD
4502 /*
4503 * Rotate position not reached yet. The stream rotate position is
4504 * the position of the next packet belonging to the next trace chunk,
4505 * but consumerd considers rotation ready when reaching the last
4506 * packet of the current chunk, hence the "rotate_position - 1".
4507 */
f8528c7a 4508
28ab034a
JG
4509 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4510 " rotate_position %" PRIu64,
4511 stream->key,
4512 stream->last_sequence_number,
4513 stream->rotate_position);
a40a503f
MD
4514 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4515 return 1;
02d02e31 4516 }
02d02e31 4517
a40a503f 4518 return 0;
02d02e31
JD
4519}
4520
d73bf3d7
JD
4521/*
4522 * Reset the state for a stream after a rotation occurred.
4523 */
4524void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4525{
28ab034a 4526 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
a40a503f 4527 stream->rotate_position = -1ULL;
d73bf3d7
JD
4528 stream->rotate_ready = false;
4529}
4530
4531/*
4532 * Perform the rotation a local stream file.
4533 */
28ab034a 4534static int rotate_local_stream(struct lttng_consumer_stream *stream)
d73bf3d7 4535{
d2956687 4536 int ret = 0;
d73bf3d7 4537
d2956687 4538 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
28ab034a
JG
4539 stream->key,
4540 stream->chan->key);
d73bf3d7 4541 stream->tracefile_size_current = 0;
d2956687 4542 stream->tracefile_count_current = 0;
d73bf3d7 4543
d2956687
JG
4544 if (stream->out_fd >= 0) {
4545 ret = close(stream->out_fd);
4546 if (ret) {
4547 PERROR("Failed to close stream out_fd of channel \"%s\"",
28ab034a 4548 stream->chan->name);
d2956687
JG
4549 }
4550 stream->out_fd = -1;
4551 }
d73bf3d7 4552
d2956687 4553 if (stream->index_file) {
d73bf3d7 4554 lttng_index_file_put(stream->index_file);
cd9adb8b 4555 stream->index_file = nullptr;
d73bf3d7
JD
4556 }
4557
d2956687
JG
4558 if (!stream->trace_chunk) {
4559 goto end;
4560 }
d73bf3d7 4561
d2956687 4562 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4563end:
4564 return ret;
d73bf3d7
JD
4565}
4566
d73bf3d7
JD
4567/*
4568 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4569 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4570 *
4571 * Return 0 on success, a negative number of error.
4572 */
f46376a1 4573int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
d73bf3d7
JD
4574{
4575 int ret;
4576
4577 DBG("Consumer rotate stream %" PRIu64, stream->key);
4578
d2956687
JG
4579 /*
4580 * Update the stream's 'current' chunk to the session's (channel)
4581 * now-current chunk.
4582 */
4583 lttng_trace_chunk_put(stream->trace_chunk);
4584 if (stream->chan->trace_chunk == stream->trace_chunk) {
4585 /*
4586 * A channel can be rotated and not have a "next" chunk
4587 * to transition to. In that case, the channel's "current chunk"
4588 * has not been closed yet, but it has not been updated to
4589 * a "next" trace chunk either. Hence, the stream, like its
4590 * parent channel, becomes part of no chunk and can't output
4591 * anything until a new trace chunk is created.
4592 */
cd9adb8b 4593 stream->trace_chunk = nullptr;
28ab034a 4594 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
d2956687
JG
4595 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4596 ret = -1;
4597 goto error;
4598 } else {
4599 /*
4600 * Update the stream's trace chunk to its parent channel's
4601 * current trace chunk.
4602 */
4603 stream->trace_chunk = stream->chan->trace_chunk;
4604 }
4605
c35f9726 4606 if (stream->net_seq_idx == (uint64_t) -1ULL) {
f46376a1 4607 ret = rotate_local_stream(stream);
c35f9726
JG
4608 if (ret < 0) {
4609 ERR("Failed to rotate stream, ret = %i", ret);
4610 goto error;
4611 }
d73bf3d7
JD
4612 }
4613
d2956687
JG
4614 if (stream->metadata_flag && stream->trace_chunk) {
4615 /*
4616 * If the stream has transitioned to a new trace
4617 * chunk, the metadata should be re-dumped to the
4618 * newest chunk.
4619 *
4620 * However, it is possible for a stream to transition to
4621 * a "no-chunk" state. This can happen if a rotation
4622 * occurs on an inactive session. In such cases, the metadata
4623 * regeneration will happen when the next trace chunk is
4624 * created.
4625 */
4626 ret = consumer_metadata_stream_dump(stream);
4627 if (ret) {
4628 goto error;
d73bf3d7
JD
4629 }
4630 }
4631 lttng_consumer_reset_stream_rotate_state(stream);
4632
4633 ret = 0;
4634
4635error:
4636 return ret;
4637}
4638
b99a8d42
JD
4639/*
4640 * Rotate all the ready streams now.
4641 *
4642 * This is especially important for low throughput streams that have already
4643 * been consumed, we cannot wait for their next packet to perform the
4644 * rotation.
92b7a7f8
MD
4645 * Need to be called with RCU read-side lock held to ensure existence of
4646 * channel.
b99a8d42
JD
4647 *
4648 * Returns 0 on success, < 0 on error
4649 */
28ab034a 4650int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
b99a8d42
JD
4651{
4652 int ret;
b99a8d42
JD
4653 struct lttng_consumer_stream *stream;
4654 struct lttng_ht_iter iter;
fa29bfbf 4655 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
b99a8d42 4656
48b7cdc2
FD
4657 ASSERT_RCU_READ_LOCKED();
4658
56047f5a 4659 lttng::urcu::read_lock_guard read_lock;
b99a8d42
JD
4660
4661 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4662
b99a8d42 4663 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4664 ht->hash_fct(&channel->key, lttng_ht_seed),
4665 ht->match_fct,
4666 &channel->key,
4667 &iter.iter,
4668 stream,
4669 node_channel_id.node)
4670 {
b99a8d42
JD
4671 health_code_update();
4672
d2956687 4673 pthread_mutex_lock(&stream->chan->lock);
b99a8d42
JD
4674 pthread_mutex_lock(&stream->lock);
4675
4676 if (!stream->rotate_ready) {
4677 pthread_mutex_unlock(&stream->lock);
d2956687 4678 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4679 continue;
4680 }
4681 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4682
f46376a1 4683 ret = lttng_consumer_rotate_stream(stream);
b99a8d42 4684 pthread_mutex_unlock(&stream->lock);
d2956687 4685 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4686 if (ret) {
4687 goto end;
4688 }
4689 }
4690
4691 ret = 0;
4692
4693end:
b99a8d42
JD
4694 return ret;
4695}
4696
28ab034a
JG
4697enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4698 const lttng_uuid& sessiond_uuid)
00fb02ac 4699{
d2956687 4700 enum lttcomm_return_code ret;
c70636a7 4701 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4702
d2956687
JG
4703 if (ctx->sessiond_uuid.is_set) {
4704 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4705 goto end;
4706 }
4707
d2956687 4708 ctx->sessiond_uuid.is_set = true;
328c2fe7 4709 ctx->sessiond_uuid.value = sessiond_uuid;
d2956687
JG
4710 ret = LTTCOMM_CONSUMERD_SUCCESS;
4711 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4712 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4713end:
4714 return ret;
4715}
4716
28ab034a
JG
4717enum lttcomm_return_code
4718lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4719 uint64_t session_id,
4720 uint64_t chunk_id,
4721 time_t chunk_creation_timestamp,
4722 const char *chunk_override_name,
4723 const struct lttng_credentials *credentials,
4724 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4725{
4726 int ret;
d2956687 4727 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 4728 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
d2956687
JG
4729 enum lttng_trace_chunk_status chunk_status;
4730 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4731 char creation_timestamp_buffer[ISO8601_STR_LEN];
4732 const char *relayd_id_str = "(none)";
4733 const char *creation_timestamp_str;
4734 struct lttng_ht_iter iter;
4735 struct lttng_consumer_channel *channel;
92816cc3 4736
d2956687
JG
4737 if (relayd_id) {
4738 /* Only used for logging purposes. */
28ab034a 4739 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4740 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4741 relayd_id_str = relayd_id_buffer;
4742 } else {
4743 relayd_id_str = "(formatting error)";
4744 }
d01ef216 4745 }
d2956687 4746
d01ef216 4747 /* Local protocol error. */
a0377dfe 4748 LTTNG_ASSERT(chunk_creation_timestamp);
d2956687 4749 ret = time_to_iso8601_str(chunk_creation_timestamp,
28ab034a
JG
4750 creation_timestamp_buffer,
4751 sizeof(creation_timestamp_buffer));
4752 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
4753
4754 DBG("Consumer create trace chunk command: relay_id = %s"
28ab034a
JG
4755 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4756 ", chunk_creation_timestamp = %s",
4757 relayd_id_str,
4758 session_id,
4759 chunk_id,
4760 chunk_override_name ?: "(none)",
4761 creation_timestamp_str);
92816cc3
JG
4762
4763 /*
d2956687
JG
4764 * The trace chunk registry, as used by the consumer daemon, implicitly
4765 * owns the trace chunks. This is only needed in the consumer since
4766 * the consumer has no notion of a session beyond session IDs being
4767 * used to identify other objects.
4768 *
4769 * The lttng_trace_chunk_registry_publish() call below provides a
4770 * reference which is not released; it implicitly becomes the session
4771 * daemon's reference to the chunk in the consumer daemon.
4772 *
4773 * The lifetime of trace chunks in the consumer daemon is managed by
4774 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4775 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4776 */
cd9adb8b 4777 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
d2956687
JG
4778 if (!created_chunk) {
4779 ERR("Failed to create trace chunk");
4780 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4781 goto error;
d2956687 4782 }
92816cc3 4783
d2956687 4784 if (chunk_override_name) {
28ab034a 4785 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
d2956687
JG
4786 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4787 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4788 goto error;
92816cc3
JG
4789 }
4790 }
4791
d2956687 4792 if (chunk_directory_handle) {
28ab034a 4793 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
d2956687
JG
4794 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4795 ERR("Failed to set trace chunk credentials");
4796 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4797 goto error;
d2956687
JG
4798 }
4799 /*
4800 * The consumer daemon has no ownership of the chunk output
4801 * directory.
4802 */
28ab034a 4803 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
cd9adb8b 4804 chunk_directory_handle = nullptr;
d2956687
JG
4805 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4806 ERR("Failed to set trace chunk's directory handle");
4807 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4808 goto error;
92816cc3
JG
4809 }
4810 }
4811
d2956687 4812 published_chunk = lttng_trace_chunk_registry_publish_chunk(
28ab034a 4813 the_consumer_data.chunk_registry, session_id, created_chunk);
d2956687 4814 lttng_trace_chunk_put(created_chunk);
cd9adb8b 4815 created_chunk = nullptr;
d2956687
JG
4816 if (!published_chunk) {
4817 ERR("Failed to publish trace chunk");
4818 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4819 goto error;
d88744a4
JD
4820 }
4821
28ab034a 4822 {
56047f5a
JG
4823 lttng::urcu::read_lock_guard read_lock;
4824 cds_lfht_for_each_entry_duplicate(
4825 the_consumer_data.channels_by_session_id_ht->ht,
4826 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4827 lttng_ht_seed),
4828 the_consumer_data.channels_by_session_id_ht->match_fct,
4829 &session_id,
4830 &iter.iter,
4831 channel,
4832 channels_by_session_id_ht_node.node)
4833 {
4834 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4835 if (ret) {
4836 /*
4837 * Roll-back the creation of this chunk.
4838 *
4839 * This is important since the session daemon will
4840 * assume that the creation of this chunk failed and
4841 * will never ask for it to be closed, resulting
4842 * in a leak and an inconsistent state for some
4843 * channels.
4844 */
4845 enum lttcomm_return_code close_ret;
4846 char path[LTTNG_PATH_MAX];
4847
4848 DBG("Failed to set new trace chunk on existing channels, rolling back");
4849 close_ret =
4850 lttng_consumer_close_trace_chunk(relayd_id,
4851 session_id,
4852 chunk_id,
4853 chunk_creation_timestamp,
4854 nullptr,
4855 path);
4856 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4857 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4858 ", chunk_id = %" PRIu64,
4859 session_id,
4860 chunk_id);
4861 }
d2956687 4862
56047f5a
JG
4863 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4864 break;
d2956687 4865 }
d2956687 4866 }
a1ae2ea5
JD
4867 }
4868
e5add6d0
JG
4869 if (relayd_id) {
4870 struct consumer_relayd_sock_pair *relayd;
4871
4872 relayd = consumer_find_relayd(*relayd_id);
4873 if (relayd) {
4874 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4875 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
e5add6d0
JG
4876 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4877 } else {
4878 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4879 }
4880
4881 if (!relayd || ret) {
4882 enum lttcomm_return_code close_ret;
ecd1a12f 4883 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4884
4885 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
28ab034a
JG
4886 session_id,
4887 chunk_id,
4888 chunk_creation_timestamp,
cd9adb8b 4889 nullptr,
28ab034a 4890 path);
e5add6d0 4891 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
28ab034a
JG
4892 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4893 ", chunk_id = %" PRIu64,
4894 session_id,
4895 chunk_id);
e5add6d0
JG
4896 }
4897
4898 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4899 goto error_unlock;
e5add6d0
JG
4900 }
4901 }
7ea24db3 4902error_unlock:
7ea24db3 4903error:
d2956687
JG
4904 /* Release the reference returned by the "publish" operation. */
4905 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4906 lttng_trace_chunk_put(created_chunk);
d2956687 4907 return ret_code;
a1ae2ea5
JD
4908}
4909
28ab034a
JG
4910enum lttcomm_return_code
4911lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4912 uint64_t session_id,
4913 uint64_t chunk_id,
4914 time_t chunk_close_timestamp,
4915 const enum lttng_trace_chunk_command_type *close_command,
4916 char *path)
a1ae2ea5 4917{
d2956687
JG
4918 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4919 struct lttng_trace_chunk *chunk;
4920 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4921 const char *relayd_id_str = "(none)";
bbc4768c 4922 const char *close_command_name = "none";
d2956687
JG
4923 struct lttng_ht_iter iter;
4924 struct lttng_consumer_channel *channel;
4925 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4926
d2956687
JG
4927 if (relayd_id) {
4928 int ret;
4929
4930 /* Only used for logging purposes. */
28ab034a 4931 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4932 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4933 relayd_id_str = relayd_id_buffer;
4934 } else {
4935 relayd_id_str = "(formatting error)";
4936 }
bbc4768c
JG
4937 }
4938 if (close_command) {
28ab034a 4939 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
bbc4768c 4940 }
d2956687
JG
4941
4942 DBG("Consumer close trace chunk command: relayd_id = %s"
28ab034a
JG
4943 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4944 relayd_id_str,
4945 session_id,
4946 chunk_id,
4947 close_command_name);
bbc4768c 4948
d2956687 4949 chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 4950 the_consumer_data.chunk_registry, session_id, chunk_id);
bbc4768c 4951 if (!chunk) {
28ab034a
JG
4952 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4953 session_id,
4954 chunk_id);
d2956687 4955 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4956 goto end;
4957 }
4958
28ab034a 4959 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
d2956687
JG
4960 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4961 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4962 goto end;
45f1d9a1 4963 }
bbc4768c
JG
4964
4965 if (close_command) {
28ab034a 4966 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
bbc4768c
JG
4967 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4968 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4969 goto end;
4970 }
4971 }
a1ae2ea5 4972
d2956687
JG
4973 /*
4974 * chunk is now invalid to access as we no longer hold a reference to
4975 * it; it is only kept around to compare it (by address) to the
4976 * current chunk found in the session's channels.
4977 */
56047f5a
JG
4978 {
4979 lttng::urcu::read_lock_guard read_lock;
4980 cds_lfht_for_each_entry (
4981 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4982 int ret;
a1ae2ea5 4983
d2956687 4984 /*
56047f5a
JG
4985 * Only change the channel's chunk to NULL if it still
4986 * references the chunk being closed. The channel may
4987 * reference a newer channel in the case of a session
4988 * rotation. When a session rotation occurs, the "next"
4989 * chunk is created before the "current" chunk is closed.
d2956687 4990 */
56047f5a
JG
4991 if (channel->trace_chunk != chunk) {
4992 continue;
4993 }
4994 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4995 if (ret) {
4996 /*
4997 * Attempt to close the chunk on as many channels as
4998 * possible.
4999 */
5000 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
5001 }
d2956687 5002 }
a1ae2ea5 5003 }
bbc4768c
JG
5004 if (relayd_id) {
5005 int ret;
5006 struct consumer_relayd_sock_pair *relayd;
5007
5008 relayd = consumer_find_relayd(*relayd_id);
5009 if (relayd) {
5010 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5011 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
bbc4768c
JG
5012 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5013 } else {
28ab034a 5014 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
bbc4768c
JG
5015 }
5016
5017 if (!relayd || ret) {
5018 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
5019 goto error_unlock;
5020 }
5021 }
5022error_unlock:
d2956687 5023end:
bbc4768c
JG
5024 /*
5025 * Release the reference returned by the "find" operation and
5026 * the session daemon's implicit reference to the chunk.
5027 */
5028 lttng_trace_chunk_put(chunk);
5029 lttng_trace_chunk_put(chunk);
5030
d2956687 5031 return ret_code;
a1ae2ea5 5032}
3654ed19 5033
28ab034a
JG
5034enum lttcomm_return_code
5035lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
3654ed19 5036{
c35f9726 5037 int ret;
d2956687 5038 enum lttcomm_return_code ret_code;
d2956687
JG
5039 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5040 const char *relayd_id_str = "(none)";
c35f9726 5041 const bool is_local_trace = !relayd_id;
cd9adb8b 5042 struct consumer_relayd_sock_pair *relayd = nullptr;
6b584c2e 5043 bool chunk_exists_local, chunk_exists_remote;
56047f5a 5044 lttng::urcu::read_lock_guard read_lock;
d2956687
JG
5045
5046 if (relayd_id) {
d2956687 5047 /* Only used for logging purposes. */
28ab034a 5048 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
5049 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5050 relayd_id_str = relayd_id_buffer;
5051 } else {
5052 relayd_id_str = "(formatting error)";
5053 }
d01ef216 5054 }
d2956687
JG
5055
5056 DBG("Consumer trace chunk exists command: relayd_id = %s"
28ab034a
JG
5057 ", chunk_id = %" PRIu64,
5058 relayd_id_str,
5059 chunk_id);
6b584c2e 5060 ret = lttng_trace_chunk_registry_chunk_exists(
28ab034a 5061 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
6b584c2e
JG
5062 if (ret) {
5063 /* Internal error. */
5064 ERR("Failed to query the existence of a trace chunk");
5065 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 5066 goto end;
6b584c2e 5067 }
28ab034a 5068 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
6b584c2e 5069 if (chunk_exists_local) {
c35f9726 5070 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
5071 goto end;
5072 } else if (is_local_trace) {
5073 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5074 goto end;
5075 }
5076
c35f9726
JG
5077 relayd = consumer_find_relayd(*relayd_id);
5078 if (!relayd) {
5079 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5080 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5081 goto end_rcu_unlock;
5082 }
5083 DBG("Looking up existence of trace chunk on relay daemon");
5084 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5085 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
c35f9726
JG
5086 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5087 if (ret < 0) {
5088 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5089 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5090 goto end_rcu_unlock;
5091 }
5092
28ab034a
JG
5093 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5094 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5095 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
d2956687 5096
c35f9726 5097end_rcu_unlock:
c35f9726 5098end:
d2956687 5099 return ret_code;
3654ed19 5100}
5f3aff8b 5101
28ab034a 5102static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
5103{
5104 struct lttng_ht *ht;
5105 struct lttng_consumer_stream *stream;
5106 struct lttng_ht_iter iter;
5107 int ret;
5108
fa29bfbf 5109 ht = the_consumer_data.stream_per_chan_id_ht;
5f3aff8b 5110
56047f5a 5111 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 5112 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
5113 ht->hash_fct(&channel->key, lttng_ht_seed),
5114 ht->match_fct,
5115 &channel->key,
5116 &iter.iter,
5117 stream,
5118 node_channel_id.node)
5119 {
5f3aff8b
MD
5120 /*
5121 * Protect against teardown with mutex.
5122 */
5123 pthread_mutex_lock(&stream->lock);
5124 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5125 goto next;
5126 }
5127 ret = consumer_clear_stream(stream);
5128 if (ret) {
5129 goto error_unlock;
5130 }
5131 next:
5132 pthread_mutex_unlock(&stream->lock);
5133 }
5f3aff8b
MD
5134 return LTTCOMM_CONSUMERD_SUCCESS;
5135
5136error_unlock:
5137 pthread_mutex_unlock(&stream->lock);
5f3aff8b
MD
5138 return ret;
5139}
5140
5141int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5142{
5143 int ret;
5144
5145 DBG("Consumer clear channel %" PRIu64, channel->key);
5146
5147 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5148 /*
5149 * Nothing to do for the metadata channel/stream.
5150 * Snapshot mechanism already take care of the metadata
5151 * handling/generation, and monitored channels only need to
5152 * have their data stream cleared..
5153 */
5154 ret = LTTCOMM_CONSUMERD_SUCCESS;
5155 goto end;
5156 }
5157
5158 if (!channel->monitor) {
5159 ret = consumer_clear_unmonitored_channel(channel);
5160 } else {
5161 ret = consumer_clear_monitored_channel(channel);
5162 }
5163end:
5164 return ret;
5165}
04ed9e10 5166
28ab034a 5167enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
04ed9e10
JG
5168{
5169 struct lttng_consumer_stream *stream;
5170 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5171
5172 if (channel->metadata_stream) {
5173 ERR("Open channel packets command attempted on a metadata channel");
5174 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5175 goto end;
5176 }
5177
56047f5a
JG
5178 {
5179 lttng::urcu::read_lock_guard read_lock;
5180 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5181 enum consumer_stream_open_packet_status status;
04ed9e10 5182
56047f5a
JG
5183 pthread_mutex_lock(&stream->lock);
5184 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5185 goto next;
5186 }
04ed9e10 5187
56047f5a
JG
5188 status = consumer_stream_open_packet(stream);
5189 switch (status) {
5190 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5191 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5192 ", channel name = %s, session id = %" PRIu64,
5193 stream->key,
5194 stream->chan->name,
5195 stream->chan->session_id);
5196 stream->opened_packet_in_current_trace_chunk = true;
5197 break;
5198 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5199 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5200 ", channel name = %s, session id = %" PRIu64,
5201 stream->key,
5202 stream->chan->name,
5203 stream->chan->session_id);
5204 break;
5205 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5206 /*
5207 * Only unexpected internal errors can lead to this
5208 * failing. Report an unknown error.
5209 */
5210 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5211 ", channel id = %" PRIu64 ", channel name = %s"
5212 ", session id = %" PRIu64,
5213 stream->key,
5214 channel->key,
5215 channel->name,
5216 channel->session_id);
5217 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5218 goto error_unlock;
5219 default:
5220 abort();
5221 }
04ed9e10 5222
56047f5a
JG
5223 next:
5224 pthread_mutex_unlock(&stream->lock);
5225 }
04ed9e10 5226 }
04ed9e10 5227end_rcu_unlock:
04ed9e10
JG
5228end:
5229 return ret;
5230
5231error_unlock:
5232 pthread_mutex_unlock(&stream->lock);
5233 goto end_rcu_unlock;
5234}
881fc67f
MD
5235
5236void lttng_consumer_sigbus_handle(void *addr)
5237{
5238 lttng_ustconsumer_sigbus_handle(addr);
5239}
This page took 0.438443 seconds and 4 git commands to generate.