Build fix: consumer: unused-but-set-variable warning
[lttng-tools.git] / src / common / consumer / consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
c9e313bc
SM
11#include <common/align.hpp>
12#include <common/common.hpp>
13#include <common/compat/endian.hpp>
14#include <common/compat/poll.hpp>
15#include <common/consumer/consumer-metadata-cache.hpp>
16#include <common/consumer/consumer-stream.hpp>
17#include <common/consumer/consumer-testpoint.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/dynamic-array.hpp>
21#include <common/index/ctf-index.hpp>
22#include <common/index/index.hpp>
23#include <common/kernel-consumer/kernel-consumer.hpp>
24#include <common/kernel-ctl/kernel-ctl.hpp>
25#include <common/relayd/relayd.hpp>
26#include <common/sessiond-comm/relayd.hpp>
27#include <common/sessiond-comm/sessiond-comm.hpp>
28#include <common/string-utils/format.hpp>
29#include <common/time.hpp>
30#include <common/trace-chunk-registry.hpp>
31#include <common/trace-chunk.hpp>
56047f5a 32#include <common/urcu.hpp>
c9e313bc
SM
33#include <common/ust-consumer/ust-consumer.hpp>
34#include <common/utils.hpp>
3bd1e081 35
28ab034a
JG
36#include <bin/lttng-consumerd/health-consumerd.hpp>
37#include <inttypes.h>
38#include <poll.h>
39#include <pthread.h>
40#include <signal.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <unistd.h>
47
97535efa 48lttng_consumer_global_data the_consumer_data;
3bd1e081 49
d8ef542d
MD
50enum consumer_channel_action {
51 CONSUMER_CHANNEL_ADD,
a0cbdd2e 52 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
53 CONSUMER_CHANNEL_QUIT,
54};
55
f1494934 56namespace {
d8ef542d
MD
57struct consumer_channel_msg {
58 enum consumer_channel_action action;
28ab034a
JG
59 struct lttng_consumer_channel *chan; /* add */
60 uint64_t key; /* del */
d8ef542d
MD
61};
62
f1494934
JG
63/*
64 * Global hash table containing respectively metadata and data streams. The
65 * stream element in this ht should only be updated by the metadata poll thread
66 * for the metadata and the data poll thread for the data.
67 */
68struct lttng_ht *metadata_ht;
69struct lttng_ht *data_ht;
70} /* namespace */
71
80957876 72/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
73int data_consumption_paused;
74
3bd1e081
MD
75/*
76 * Flag to inform the polling thread to quit when all fd hung up. Updated by
77 * the consumer_thread_receive_fds when it notices that all fds has hung up.
78 * Also updated by the signal handler (consumer_should_exit()). Read by the
79 * polling threads.
80 */
10211f5c 81int consumer_quit;
3bd1e081 82
cd9adb8b 83static const char *get_consumer_domain()
5da88b0f 84{
fa29bfbf 85 switch (the_consumer_data.type) {
5da88b0f
MD
86 case LTTNG_CONSUMER_KERNEL:
87 return DEFAULT_KERNEL_TRACE_DIR;
88 case LTTNG_CONSUMER64_UST:
89 /* Fall-through. */
90 case LTTNG_CONSUMER32_UST:
91 return DEFAULT_UST_TRACE_DIR;
92 default:
93 abort();
94 }
95}
96
acdb9057
DG
97/*
98 * Notify a thread lttng pipe to poll back again. This usually means that some
99 * global state has changed so we just send back the thread in a poll wait
100 * call.
101 */
102static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
103{
cd9adb8b 104 struct lttng_consumer_stream *null_stream = nullptr;
acdb9057 105
a0377dfe 106 LTTNG_ASSERT(pipe);
acdb9057 107
5c7248cd
JG
108 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
109 pointer. */
acdb9057
DG
110}
111
5c635c72
MD
112static void notify_health_quit_pipe(int *pipe)
113{
6cd525e8 114 ssize_t ret;
5c635c72 115
6cd525e8
MD
116 ret = lttng_write(pipe[1], "4", 1);
117 if (ret < 1) {
5c635c72
MD
118 PERROR("write consumer health quit");
119 }
120}
121
d8ef542d 122static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
123 struct lttng_consumer_channel *chan,
124 uint64_t key,
125 enum consumer_channel_action action)
d8ef542d
MD
126{
127 struct consumer_channel_msg msg;
6cd525e8 128 ssize_t ret;
d8ef542d 129
e56251fc
DG
130 memset(&msg, 0, sizeof(msg));
131
d8ef542d
MD
132 msg.action = action;
133 msg.chan = chan;
f21dae48 134 msg.key = key;
6cd525e8
MD
135 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
136 if (ret < sizeof(msg)) {
137 PERROR("notify_channel_pipe write error");
138 }
d8ef542d
MD
139}
140
28ab034a 141void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
a0cbdd2e 142{
cd9adb8b 143 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
a0cbdd2e
MD
144}
145
d8ef542d 146static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
147 struct lttng_consumer_channel **chan,
148 uint64_t *key,
149 enum consumer_channel_action *action)
d8ef542d
MD
150{
151 struct consumer_channel_msg msg;
6cd525e8 152 ssize_t ret;
d8ef542d 153
6cd525e8
MD
154 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
155 if (ret < sizeof(msg)) {
156 ret = -1;
157 goto error;
d8ef542d 158 }
6cd525e8
MD
159 *action = msg.action;
160 *chan = msg.chan;
161 *key = msg.key;
162error:
163 return (int) ret;
d8ef542d
MD
164}
165
212d67a2
DG
166/*
167 * Cleanup the stream list of a channel. Those streams are not yet globally
168 * visible
169 */
170static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
171{
172 struct lttng_consumer_stream *stream, *stmp;
173
a0377dfe 174 LTTNG_ASSERT(channel);
212d67a2
DG
175
176 /* Delete streams that might have been left in the stream list. */
28ab034a 177 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
212d67a2
DG
178 /*
179 * Once a stream is added to this list, the buffers were created so we
180 * have a guarantee that this call will succeed. Setting the monitor
181 * mode to 0 so we don't lock nor try to delete the stream from the
182 * global hash table.
183 */
184 stream->monitor = 0;
cd9adb8b 185 consumer_stream_destroy(stream, nullptr);
212d67a2
DG
186 }
187}
188
3bd1e081
MD
189/*
190 * Find a stream. The consumer_data.lock must be locked during this
191 * call.
192 */
28ab034a 193static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
3bd1e081 194{
e4421fec 195 struct lttng_ht_iter iter;
d88aee68 196 struct lttng_ht_node_u64 *node;
cd9adb8b 197 struct lttng_consumer_stream *stream = nullptr;
3bd1e081 198
a0377dfe 199 LTTNG_ASSERT(ht);
8389e4f8 200
d88aee68
DG
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
cd9adb8b 203 return nullptr;
7a57cf92 204 }
e4421fec 205
56047f5a 206 lttng::urcu::read_lock_guard read_lock;
6065ceec 207
d88aee68
DG
208 lttng_ht_lookup(ht, &key, &iter);
209 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 210 if (node != nullptr) {
0114db0e 211 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
3bd1e081 212 }
e4421fec
DG
213
214 return stream;
3bd1e081
MD
215}
216
da009f2c 217static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
218{
219 struct lttng_consumer_stream *stream;
220
56047f5a 221 lttng::urcu::read_lock_guard read_lock;
ffe60014 222 stream = find_stream(key, ht);
04253271 223 if (stream) {
da009f2c 224 stream->key = (uint64_t) -1ULL;
04253271
MD
225 /*
226 * We don't want the lookup to match, but we still need
227 * to iterate on this stream when iterating over the hash table. Just
228 * change the node key.
229 */
da009f2c 230 stream->node.key = (uint64_t) -1ULL;
04253271 231 }
7ad0a0cb
MD
232}
233
d56db448
DG
234/*
235 * Return a channel object for the given key.
236 *
237 * RCU read side lock MUST be acquired before calling this function and
238 * protects the channel ptr.
239 */
d88aee68 240struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 241{
e4421fec 242 struct lttng_ht_iter iter;
d88aee68 243 struct lttng_ht_node_u64 *node;
cd9adb8b 244 struct lttng_consumer_channel *channel = nullptr;
3bd1e081 245
48b7cdc2
FD
246 ASSERT_RCU_READ_LOCKED();
247
d88aee68
DG
248 /* -1ULL keys are lookup failures */
249 if (key == (uint64_t) -1ULL) {
cd9adb8b 250 return nullptr;
7a57cf92 251 }
e4421fec 252
fa29bfbf 253 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
d88aee68 254 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 255 if (node != nullptr) {
0114db0e 256 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
3bd1e081 257 }
e4421fec
DG
258
259 return channel;
3bd1e081
MD
260}
261
b5a6470f
DG
262/*
263 * There is a possibility that the consumer does not have enough time between
264 * the close of the channel on the session daemon and the cleanup in here thus
265 * once we have a channel add with an existing key, we know for sure that this
266 * channel will eventually get cleaned up by all streams being closed.
267 *
268 * This function just nullifies the already existing channel key.
269 */
270static void steal_channel_key(uint64_t key)
271{
272 struct lttng_consumer_channel *channel;
273
56047f5a 274 lttng::urcu::read_lock_guard read_lock;
b5a6470f
DG
275 channel = consumer_find_channel(key);
276 if (channel) {
277 channel->key = (uint64_t) -1ULL;
278 /*
279 * We don't want the lookup to match, but we still need to iterate on
280 * this channel when iterating over the hash table. Just change the
281 * node key.
282 */
283 channel->node.key = (uint64_t) -1ULL;
284 }
b5a6470f
DG
285}
286
ffe60014 287static void free_channel_rcu(struct rcu_head *head)
702b1ea4 288{
28ab034a 289 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
ffe60014 290 struct lttng_consumer_channel *channel =
0114db0e 291 lttng::utils::container_of(node, &lttng_consumer_channel::node);
702b1ea4 292
fa29bfbf 293 switch (the_consumer_data.type) {
b83e03c4
MD
294 case LTTNG_CONSUMER_KERNEL:
295 break;
296 case LTTNG_CONSUMER32_UST:
297 case LTTNG_CONSUMER64_UST:
298 lttng_ustconsumer_free_channel(channel);
299 break;
300 default:
301 ERR("Unknown consumer_data type");
302 abort();
303 }
ffe60014 304 free(channel);
702b1ea4
MD
305}
306
00e2e675
DG
307/*
308 * RCU protected relayd socket pair free.
309 */
ffe60014 310static void free_relayd_rcu(struct rcu_head *head)
00e2e675 311{
28ab034a 312 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
00e2e675 313 struct consumer_relayd_sock_pair *relayd =
0114db0e 314 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675 315
8994307f
DG
316 /*
317 * Close all sockets. This is done in the call RCU since we don't want the
318 * socket fds to be reassigned thus potentially creating bad state of the
319 * relayd object.
320 *
321 * We do not have to lock the control socket mutex here since at this stage
322 * there is no one referencing to this relayd object.
323 */
324 (void) relayd_close(&relayd->control_sock);
325 (void) relayd_close(&relayd->data_sock);
326
3a84e2f3 327 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
328 free(relayd);
329}
330
331/*
332 * Destroy and free relayd socket pair object.
00e2e675 333 */
51230d70 334void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
335{
336 int ret;
337 struct lttng_ht_iter iter;
338
cd9adb8b 339 if (relayd == nullptr) {
173af62f
DG
340 return;
341 }
342
00e2e675
DG
343 DBG("Consumer destroy and close relayd socket pair");
344
345 iter.iter.node = &relayd->node.node;
fa29bfbf 346 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
173af62f 347 if (ret != 0) {
8994307f 348 /* We assume the relayd is being or is destroyed */
173af62f
DG
349 return;
350 }
00e2e675 351
00e2e675 352 /* RCU free() call */
ffe60014
DG
353 call_rcu(&relayd->node.head, free_relayd_rcu);
354}
355
356/*
357 * Remove a channel from the global list protected by a mutex. This function is
358 * also responsible for freeing its data structures.
359 */
360void consumer_del_channel(struct lttng_consumer_channel *channel)
361{
ffe60014
DG
362 struct lttng_ht_iter iter;
363
d88aee68 364 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014 365
fa29bfbf 366 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 367 pthread_mutex_lock(&channel->lock);
ffe60014 368
212d67a2
DG
369 /* Destroy streams that might have been left in the stream list. */
370 clean_channel_stream_list(channel);
51e762e5 371
d3e2ba59
JD
372 if (channel->live_timer_enabled == 1) {
373 consumer_timer_live_stop(channel);
374 }
e9404c27
JG
375 if (channel->monitor_timer_enabled == 1) {
376 consumer_timer_monitor_stop(channel);
377 }
d3e2ba59 378
319dcddc
JG
379 /*
380 * Send a last buffer statistics sample to the session daemon
381 * to ensure it tracks the amount of data consumed by this channel.
382 */
383 sample_and_send_channel_buffer_stats(channel);
384
fa29bfbf 385 switch (the_consumer_data.type) {
ffe60014
DG
386 case LTTNG_CONSUMER_KERNEL:
387 break;
388 case LTTNG_CONSUMER32_UST:
389 case LTTNG_CONSUMER64_UST:
390 lttng_ustconsumer_del_channel(channel);
391 break;
392 default:
393 ERR("Unknown consumer_data type");
a0377dfe 394 abort();
ffe60014
DG
395 goto end;
396 }
397
d2956687 398 lttng_trace_chunk_put(channel->trace_chunk);
cd9adb8b 399 channel->trace_chunk = nullptr;
5c3892a6 400
d2956687
JG
401 if (channel->is_published) {
402 int ret;
403
56047f5a 404 lttng::urcu::read_lock_guard read_lock;
d2956687 405 iter.iter.node = &channel->node.node;
fa29bfbf 406 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
a0377dfe 407 LTTNG_ASSERT(!ret);
ffe60014 408
d2956687 409 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
28ab034a 410 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
a0377dfe 411 LTTNG_ASSERT(!ret);
d2956687
JG
412 }
413
b6921a17
JG
414 channel->is_deleted = true;
415 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 416end:
a9838785 417 pthread_mutex_unlock(&channel->lock);
fa29bfbf 418 pthread_mutex_unlock(&the_consumer_data.lock);
00e2e675
DG
419}
420
228b5bf7
DG
421/*
422 * Iterate over the relayd hash table and destroy each element. Finally,
423 * destroy the whole hash table.
424 */
cd9adb8b 425static void cleanup_relayd_ht()
228b5bf7
DG
426{
427 struct lttng_ht_iter iter;
428 struct consumer_relayd_sock_pair *relayd;
429
56047f5a
JG
430 {
431 lttng::urcu::read_lock_guard read_lock;
228b5bf7 432
56047f5a
JG
433 cds_lfht_for_each_entry (
434 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
435 consumer_destroy_relayd(relayd);
436 }
228b5bf7
DG
437 }
438
fa29bfbf 439 lttng_ht_destroy(the_consumer_data.relayd_ht);
228b5bf7
DG
440}
441
8994307f
DG
442/*
443 * Update the end point status of all streams having the given network sequence
444 * index (relayd index).
445 *
446 * It's atomically set without having the stream mutex locked which is fine
447 * because we handle the write/read race with a pipe wakeup for each thread.
448 */
da009f2c 449static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
28ab034a 450 enum consumer_endpoint_status status)
8994307f
DG
451{
452 struct lttng_ht_iter iter;
453 struct lttng_consumer_stream *stream;
454
da009f2c 455 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f 456
56047f5a 457 lttng::urcu::read_lock_guard read_lock;
8994307f
DG
458
459 /* Let's begin with metadata */
28ab034a 460 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
461 if (stream->net_seq_idx == net_seq_idx) {
462 uatomic_set(&stream->endpoint_status, status);
463 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
464 }
465 }
466
467 /* Follow up by the data streams */
28ab034a 468 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
469 if (stream->net_seq_idx == net_seq_idx) {
470 uatomic_set(&stream->endpoint_status, status);
471 DBG("Delete flag set to data stream %d", stream->wait_fd);
472 }
473 }
8994307f
DG
474}
475
476/*
477 * Cleanup a relayd object by flagging every associated streams for deletion,
478 * destroying the object meaning removing it from the relayd hash table,
479 * closing the sockets and freeing the memory in a RCU call.
480 *
481 * If a local data context is available, notify the threads that the streams'
482 * state have changed.
483 */
9276e5c8 484void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 485{
da009f2c 486 uint64_t netidx;
8994307f 487
a0377dfe 488 LTTNG_ASSERT(relayd);
8994307f 489
97535efa 490 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
9617607b 491
8994307f
DG
492 /* Save the net sequence index before destroying the object */
493 netidx = relayd->net_seq_idx;
494
495 /*
496 * Delete the relayd from the relayd hash table, close the sockets and free
497 * the object in a RCU call.
498 */
51230d70 499 consumer_destroy_relayd(relayd);
8994307f
DG
500
501 /* Set inactive endpoint to all streams */
502 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
503
504 /*
505 * With a local data context, notify the threads that the streams' state
506 * have changed. The write() action on the pipe acts as an "implicit"
507 * memory barrier ordering the updates of the end point status from the
508 * read of this status which happens AFTER receiving this notify.
509 */
9276e5c8
JR
510 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
511 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
512}
513
a6ba4fe1
DG
514/*
515 * Flag a relayd socket pair for destruction. Destroy it if the refcount
516 * reaches zero.
517 *
518 * RCU read side lock MUST be aquired before calling this function.
519 */
520void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
521{
a0377dfe 522 LTTNG_ASSERT(relayd);
48b7cdc2 523 ASSERT_RCU_READ_LOCKED();
a6ba4fe1
DG
524
525 /* Set destroy flag for this object */
526 uatomic_set(&relayd->destroy_flag, 1);
527
528 /* Destroy the relayd if refcount is 0 */
529 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 530 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
531 }
532}
533
3bd1e081 534/*
1d1a276c
DG
535 * Completly destroy stream from every visiable data structure and the given
536 * hash table if one.
537 *
538 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 539 */
28ab034a 540void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
3bd1e081 541{
1d1a276c 542 consumer_stream_destroy(stream, ht);
3bd1e081
MD
543}
544
5ab66908
MD
545/*
546 * XXX naming of del vs destroy is all mixed up.
547 */
548void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
549{
550 consumer_stream_destroy(stream, data_ht);
551}
552
553void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
554{
555 consumer_stream_destroy(stream, metadata_ht);
556}
557
28ab034a
JG
558void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
559 struct lttng_consumer_channel *channel)
d9a2e16e 560{
28ab034a 561 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
d9a2e16e
JD
562}
563
3bd1e081
MD
564/*
565 * Add a stream to the global list protected by a mutex.
566 */
66d583dc 567void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 568{
5ab66908 569 struct lttng_ht *ht = data_ht;
3bd1e081 570
a0377dfe
FD
571 LTTNG_ASSERT(stream);
572 LTTNG_ASSERT(ht);
c77fc10a 573
d88aee68 574 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5 575
fa29bfbf 576 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 577 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 578 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 579 pthread_mutex_lock(&stream->lock);
56047f5a 580 lttng::urcu::read_lock_guard read_lock;
e316aad5 581
43c34bc3 582 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 583 steal_stream_key(stream->key, ht);
43c34bc3 584
d88aee68 585 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 586
28ab034a 587 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 588
ca22feea
DG
589 /*
590 * Add stream to the stream_list_ht of the consumer data. No need to steal
591 * the key since the HT does not use it and we allow to add redundant keys
592 * into this table.
593 */
28ab034a 594 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 595
e316aad5 596 /*
ffe60014
DG
597 * When nb_init_stream_left reaches 0, we don't need to trigger any action
598 * in terms of destroying the associated channel, because the action that
e316aad5
DG
599 * causes the count to become 0 also causes a stream to be added. The
600 * channel deletion will thus be triggered by the following removal of this
601 * stream.
602 */
ffe60014 603 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
604 /* Increment refcount before decrementing nb_init_stream_left */
605 cmm_smp_wmb();
ffe60014 606 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
607 }
608
609 /* Update consumer data once the node is inserted. */
fa29bfbf
SM
610 the_consumer_data.stream_count++;
611 the_consumer_data.need_update = 1;
3bd1e081 612
2e818a6a 613 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 614 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 615 pthread_mutex_unlock(&stream->chan->lock);
fa29bfbf 616 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
617}
618
00e2e675 619/*
3f8e211f
DG
620 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
621 * be acquired before calling this.
00e2e675 622 */
d09e1200 623static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
624{
625 int ret = 0;
d88aee68 626 struct lttng_ht_node_u64 *node;
00e2e675
DG
627 struct lttng_ht_iter iter;
628
a0377dfe 629 LTTNG_ASSERT(relayd);
48b7cdc2 630 ASSERT_RCU_READ_LOCKED();
00e2e675 631
28ab034a 632 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
d88aee68 633 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 634 if (node != nullptr) {
00e2e675
DG
635 goto end;
636 }
fa29bfbf 637 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
00e2e675 638
00e2e675
DG
639end:
640 return ret;
641}
642
643/*
644 * Allocate and return a consumer relayd socket.
645 */
28ab034a 646static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
00e2e675 647{
cd9adb8b 648 struct consumer_relayd_sock_pair *obj = nullptr;
00e2e675 649
da009f2c
MD
650 /* net sequence index of -1 is a failure */
651 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
652 goto error;
653 }
654
64803277 655 obj = zmalloc<consumer_relayd_sock_pair>();
cd9adb8b 656 if (obj == nullptr) {
00e2e675
DG
657 PERROR("zmalloc relayd sock");
658 goto error;
659 }
660
661 obj->net_seq_idx = net_seq_idx;
662 obj->refcount = 0;
173af62f 663 obj->destroy_flag = 0;
f96e4545
MD
664 obj->control_sock.sock.fd = -1;
665 obj->data_sock.sock.fd = -1;
d88aee68 666 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
cd9adb8b 667 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
00e2e675
DG
668
669error:
670 return obj;
671}
672
673/*
674 * Find a relayd socket pair in the global consumer data.
675 *
676 * Return the object if found else NULL.
b0b335c8
MD
677 * RCU read-side lock must be held across this call and while using the
678 * returned object.
00e2e675 679 */
d88aee68 680struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
681{
682 struct lttng_ht_iter iter;
d88aee68 683 struct lttng_ht_node_u64 *node;
cd9adb8b 684 struct consumer_relayd_sock_pair *relayd = nullptr;
00e2e675 685
48b7cdc2
FD
686 ASSERT_RCU_READ_LOCKED();
687
00e2e675 688 /* Negative keys are lookup failures */
d88aee68 689 if (key == (uint64_t) -1ULL) {
00e2e675
DG
690 goto error;
691 }
692
fa29bfbf 693 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
d88aee68 694 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 695 if (node != nullptr) {
0114db0e 696 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675
DG
697 }
698
00e2e675
DG
699error:
700 return relayd;
701}
702
10a50311
JD
703/*
704 * Find a relayd and send the stream
705 *
706 * Returns 0 on success, < 0 on error
707 */
28ab034a 708int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
10a50311
JD
709{
710 int ret = 0;
711 struct consumer_relayd_sock_pair *relayd;
712
a0377dfe
FD
713 LTTNG_ASSERT(stream);
714 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
715 LTTNG_ASSERT(path);
10a50311
JD
716
717 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 718 lttng::urcu::read_lock_guard read_lock;
10a50311 719 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 720 if (relayd != nullptr) {
10a50311
JD
721 /* Add stream on the relayd */
722 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
723 ret = relayd_add_stream(&relayd->control_sock,
724 stream->name,
725 get_consumer_domain(),
726 path,
727 &stream->relayd_stream_id,
728 stream->chan->tracefile_size,
729 stream->chan->tracefile_count,
730 stream->trace_chunk);
10a50311
JD
731 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
732 if (ret < 0) {
28ab034a
JG
733 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
734 relayd->net_seq_idx);
9276e5c8 735 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
736 goto end;
737 }
1c20f0e2 738
10a50311 739 uatomic_inc(&relayd->refcount);
d01178b6 740 stream->sent_to_relayd = 1;
10a50311
JD
741 } else {
742 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
28ab034a
JG
743 stream->key,
744 stream->net_seq_idx);
10a50311
JD
745 ret = -1;
746 goto end;
747 }
748
749 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
28ab034a
JG
750 stream->name,
751 stream->key,
752 stream->net_seq_idx);
10a50311
JD
753
754end:
10a50311
JD
755 return ret;
756}
757
a4baae1b
JD
758/*
759 * Find a relayd and send the streams sent message
760 *
761 * Returns 0 on success, < 0 on error
762 */
763int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
764{
765 int ret = 0;
766 struct consumer_relayd_sock_pair *relayd;
767
a0377dfe 768 LTTNG_ASSERT(net_seq_idx != -1ULL);
a4baae1b
JD
769
770 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 771 lttng::urcu::read_lock_guard read_lock;
a4baae1b 772 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 773 if (relayd != nullptr) {
a4baae1b
JD
774 /* Add stream on the relayd */
775 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
776 ret = relayd_streams_sent(&relayd->control_sock);
777 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
778 if (ret < 0) {
28ab034a
JG
779 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
780 relayd->net_seq_idx);
9276e5c8 781 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
782 goto end;
783 }
784 } else {
28ab034a 785 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
a4baae1b
JD
786 ret = -1;
787 goto end;
788 }
789
790 ret = 0;
791 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
792
793end:
a4baae1b
JD
794 return ret;
795}
796
10a50311
JD
797/*
798 * Find a relayd and close the stream
799 */
800void close_relayd_stream(struct lttng_consumer_stream *stream)
801{
802 struct consumer_relayd_sock_pair *relayd;
803
804 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 805 lttng::urcu::read_lock_guard read_lock;
10a50311
JD
806 relayd = consumer_find_relayd(stream->net_seq_idx);
807 if (relayd) {
808 consumer_stream_relayd_close(stream, relayd);
809 }
10a50311
JD
810}
811
00e2e675
DG
812/*
813 * Handle stream for relayd transmission if the stream applies for network
814 * streaming where the net sequence index is set.
815 *
816 * Return destination file descriptor or negative value on error.
817 */
6197aea7 818static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
28ab034a
JG
819 size_t data_size,
820 unsigned long padding,
821 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
822{
823 int outfd = -1, ret;
00e2e675
DG
824 struct lttcomm_relayd_data_hdr data_hdr;
825
826 /* Safety net */
a0377dfe
FD
827 LTTNG_ASSERT(stream);
828 LTTNG_ASSERT(relayd);
00e2e675
DG
829
830 /* Reset data header */
831 memset(&data_hdr, 0, sizeof(data_hdr));
832
00e2e675
DG
833 if (stream->metadata_flag) {
834 /* Caller MUST acquire the relayd control socket lock */
835 ret = relayd_send_metadata(&relayd->control_sock, data_size);
836 if (ret < 0) {
837 goto error;
838 }
839
840 /* Metadata are always sent on the control socket. */
6151a90f 841 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
842 } else {
843 /* Set header with stream information */
844 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
845 data_hdr.data_size = htobe32(data_size);
1d4dfdef 846 data_hdr.padding_size = htobe32(padding);
c35f9726 847
39df6d9f
DG
848 /*
849 * Note that net_seq_num below is assigned with the *current* value of
850 * next_net_seq_num and only after that the next_net_seq_num will be
851 * increment. This is why when issuing a command on the relayd using
852 * this next value, 1 should always be substracted in order to compare
853 * the last seen sequence number on the relayd side to the last sent.
854 */
3604f373 855 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
856 /* Other fields are zeroed previously */
857
28ab034a 858 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
00e2e675
DG
859 if (ret < 0) {
860 goto error;
861 }
862
3604f373
DG
863 ++stream->next_net_seq_num;
864
00e2e675 865 /* Set to go on data socket */
6151a90f 866 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
867 }
868
869error:
870 return outfd;
871}
872
b1316da1
JG
873/*
874 * Write a character on the metadata poll pipe to wake the metadata thread.
875 * Returns 0 on success, -1 on error.
876 */
877int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
878{
879 int ret = 0;
880
28ab034a 881 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
b1316da1
JG
882 if (channel->monitor && channel->metadata_stream) {
883 const char dummy = 'c';
28ab034a
JG
884 const ssize_t write_ret =
885 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
b1316da1
JG
886
887 if (write_ret < 1) {
888 if (errno == EWOULDBLOCK) {
889 /*
890 * This is fine, the metadata poll thread
891 * is having a hard time keeping-up, but
892 * it will eventually wake-up and consume
893 * the available data.
894 */
895 ret = 0;
896 } else {
897 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
898 ret = -1;
899 goto end;
900 }
901 }
902 }
903
904end:
905 return ret;
906}
907
d2956687
JG
908/*
909 * Trigger a dump of the metadata content. Following/during the succesful
910 * completion of this call, the metadata poll thread will start receiving
911 * metadata packets to consume.
912 *
913 * The caller must hold the channel and stream locks.
914 */
28ab034a 915static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
d2956687
JG
916{
917 int ret;
918
919 ASSERT_LOCKED(stream->chan->lock);
920 ASSERT_LOCKED(stream->lock);
a0377dfe
FD
921 LTTNG_ASSERT(stream->metadata_flag);
922 LTTNG_ASSERT(stream->chan->trace_chunk);
d2956687 923
fa29bfbf 924 switch (the_consumer_data.type) {
d2956687
JG
925 case LTTNG_CONSUMER_KERNEL:
926 /*
927 * Reset the position of what has been read from the
928 * metadata cache to 0 so we can dump it again.
929 */
930 ret = kernctl_metadata_cache_dump(stream->wait_fd);
931 break;
932 case LTTNG_CONSUMER32_UST:
933 case LTTNG_CONSUMER64_UST:
934 /*
935 * Reset the position pushed from the metadata cache so it
936 * will write from the beginning on the next push.
937 */
938 stream->ust_metadata_pushed = 0;
939 ret = consumer_metadata_wakeup_pipe(stream->chan);
940 break;
941 default:
942 ERR("Unknown consumer_data type");
943 abort();
944 }
945 if (ret < 0) {
946 ERR("Failed to dump the metadata cache");
947 }
948 return ret;
949}
950
28ab034a
JG
951static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
952 struct lttng_trace_chunk *new_trace_chunk)
d2956687 953{
d2956687 954 pthread_mutex_lock(&channel->lock);
b6921a17
JG
955 if (channel->is_deleted) {
956 /*
957 * The channel has been logically deleted and should no longer
958 * be used. It has released its reference to its current trace
959 * chunk and should not acquire a new one.
960 *
961 * Return success as there is nothing for the caller to do.
962 */
963 goto end;
964 }
d2956687
JG
965
966 /*
967 * The acquisition of the reference cannot fail (barring
968 * a severe internal error) since a reference to the published
969 * chunk is already held by the caller.
970 */
971 if (new_trace_chunk) {
28ab034a 972 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
d2956687 973
a0377dfe 974 LTTNG_ASSERT(acquired_reference);
d2956687
JG
975 }
976
977 lttng_trace_chunk_put(channel->trace_chunk);
978 channel->trace_chunk = new_trace_chunk;
d2956687
JG
979end:
980 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 981 return 0;
d2956687
JG
982}
983
3bd1e081 984/*
ffe60014
DG
985 * Allocate and return a new lttng_consumer_channel object using the given key
986 * to initialize the hash table node.
987 *
988 * On error, return NULL.
3bd1e081 989 */
886224ff 990struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
28ab034a
JG
991 uint64_t session_id,
992 const uint64_t *chunk_id,
993 const char *pathname,
994 const char *name,
995 uint64_t relayd_id,
996 enum lttng_event_output output,
997 uint64_t tracefile_size,
998 uint64_t tracefile_count,
999 uint64_t session_id_per_pid,
1000 unsigned int monitor,
1001 unsigned int live_timer_interval,
1002 bool is_in_live_session,
1003 const char *root_shm_path,
1004 const char *shm_path)
3bd1e081 1005{
cd9adb8b
JG
1006 struct lttng_consumer_channel *channel = nullptr;
1007 struct lttng_trace_chunk *trace_chunk = nullptr;
d2956687
JG
1008
1009 if (chunk_id) {
1010 trace_chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 1011 the_consumer_data.chunk_registry, session_id, *chunk_id);
d2956687
JG
1012 if (!trace_chunk) {
1013 ERR("Failed to find trace chunk reference during creation of channel");
1014 goto end;
1015 }
1016 }
3bd1e081 1017
64803277 1018 channel = zmalloc<lttng_consumer_channel>();
cd9adb8b 1019 if (channel == nullptr) {
7a57cf92 1020 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
1021 goto end;
1022 }
ffe60014
DG
1023
1024 channel->key = key;
3bd1e081 1025 channel->refcount = 0;
ffe60014 1026 channel->session_id = session_id;
1950109e 1027 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1028 channel->relayd_id = relayd_id;
1624d5b7
JD
1029 channel->tracefile_size = tracefile_size;
1030 channel->tracefile_count = tracefile_count;
2bba9e53 1031 channel->monitor = monitor;
ecc48a90 1032 channel->live_timer_interval = live_timer_interval;
a2814ea7 1033 channel->is_live = is_in_live_session;
cd9adb8b
JG
1034 pthread_mutex_init(&channel->lock, nullptr);
1035 pthread_mutex_init(&channel->timer_lock, nullptr);
ffe60014 1036
0c759fc9
DG
1037 switch (output) {
1038 case LTTNG_EVENT_SPLICE:
1039 channel->output = CONSUMER_CHANNEL_SPLICE;
1040 break;
1041 case LTTNG_EVENT_MMAP:
1042 channel->output = CONSUMER_CHANNEL_MMAP;
1043 break;
1044 default:
a0377dfe 1045 abort();
0c759fc9 1046 free(channel);
cd9adb8b 1047 channel = nullptr;
0c759fc9
DG
1048 goto end;
1049 }
1050
07b86b52
JD
1051 /*
1052 * In monitor mode, the streams associated with the channel will be put in
1053 * a special list ONLY owned by this channel. So, the refcount is set to 1
1054 * here meaning that the channel itself has streams that are referenced.
1055 *
1056 * On a channel deletion, once the channel is no longer visible, the
1057 * refcount is decremented and checked for a zero value to delete it. With
1058 * streams in no monitor mode, it will now be safe to destroy the channel.
1059 */
1060 if (!channel->monitor) {
1061 channel->refcount = 1;
1062 }
1063
ffe60014
DG
1064 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1065 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1066
1067 strncpy(channel->name, name, sizeof(channel->name));
1068 channel->name[sizeof(channel->name) - 1] = '\0';
1069
3d071855
MD
1070 if (root_shm_path) {
1071 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1072 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1073 }
d7ba1388
MD
1074 if (shm_path) {
1075 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1076 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1077 }
1078
d88aee68 1079 lttng_ht_node_init_u64(&channel->node, channel->key);
28ab034a 1080 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
d8ef542d
MD
1081
1082 channel->wait_fd = -1;
ffe60014
DG
1083 CDS_INIT_LIST_HEAD(&channel->streams.head);
1084
d2956687 1085 if (trace_chunk) {
28ab034a 1086 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
d2956687
JG
1087 if (ret) {
1088 goto error;
1089 }
1090 }
1091
62a7b8ed 1092 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1093
3bd1e081 1094end:
d2956687 1095 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1096 return channel;
d2956687
JG
1097error:
1098 consumer_del_channel(channel);
cd9adb8b 1099 channel = nullptr;
d2956687 1100 goto end;
3bd1e081
MD
1101}
1102
1103/*
1104 * Add a channel to the global list protected by a mutex.
821fffb2 1105 *
b5a6470f 1106 * Always return 0 indicating success.
3bd1e081 1107 */
d8ef542d 1108int consumer_add_channel(struct lttng_consumer_channel *channel,
28ab034a 1109 struct lttng_consumer_local_data *ctx)
3bd1e081 1110{
fa29bfbf 1111 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 1112 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1113 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1114
b5a6470f
DG
1115 /*
1116 * This gives us a guarantee that the channel we are about to add to the
1117 * channel hash table will be unique. See this function comment on the why
1118 * we need to steel the channel key at this stage.
1119 */
1120 steal_channel_key(channel->key);
c77fc10a 1121
56047f5a 1122 lttng::urcu::read_lock_guard read_lock;
fa29bfbf
SM
1123 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1124 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
28ab034a 1125 &channel->channels_by_session_id_ht_node);
d2956687 1126 channel->is_published = true;
b5a6470f 1127
ec6ea7d0 1128 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1129 pthread_mutex_unlock(&channel->lock);
fa29bfbf 1130 pthread_mutex_unlock(&the_consumer_data.lock);
702b1ea4 1131
b5a6470f 1132 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1133 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1134 }
b5a6470f
DG
1135
1136 return 0;
3bd1e081
MD
1137}
1138
1139/*
1140 * Allocate the pollfd structure and the local view of the out fds to avoid
1141 * doing a lookup in the linked list and concurrency issues when writing is
1142 * needed. Called with consumer_data.lock held.
1143 *
1144 * Returns the number of fds in the structures.
1145 */
ffe60014 1146static int update_poll_array(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1147 struct pollfd **pollfd,
1148 struct lttng_consumer_stream **local_stream,
1149 struct lttng_ht *ht,
1150 int *nb_inactive_fd)
3bd1e081 1151{
3bd1e081 1152 int i = 0;
e4421fec
DG
1153 struct lttng_ht_iter iter;
1154 struct lttng_consumer_stream *stream;
3bd1e081 1155
a0377dfe
FD
1156 LTTNG_ASSERT(ctx);
1157 LTTNG_ASSERT(ht);
1158 LTTNG_ASSERT(pollfd);
1159 LTTNG_ASSERT(local_stream);
ffe60014 1160
3bd1e081 1161 DBG("Updating poll fd array");
9a2fcf78 1162 *nb_inactive_fd = 0;
56047f5a
JG
1163
1164 {
1165 lttng::urcu::read_lock_guard read_lock;
1166 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1167 /*
1168 * Only active streams with an active end point can be added to the
1169 * poll set and local stream storage of the thread.
1170 *
1171 * There is a potential race here for endpoint_status to be updated
1172 * just after the check. However, this is OK since the stream(s) will
1173 * be deleted once the thread is notified that the end point state has
1174 * changed where this function will be called back again.
1175 *
1176 * We track the number of inactive FDs because they still need to be
1177 * closed by the polling thread after a wakeup on the data_pipe or
1178 * metadata_pipe.
1179 */
1180 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1181 (*nb_inactive_fd)++;
1182 continue;
1183 }
1184
1185 (*pollfd)[i].fd = stream->wait_fd;
1186 (*pollfd)[i].events = POLLIN | POLLPRI;
1187 local_stream[i] = stream;
1188 i++;
3bd1e081 1189 }
3bd1e081
MD
1190 }
1191
1192 /*
50f8ae69 1193 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1194 * increment i so nb_fd is the number of real FD.
1195 */
acdb9057 1196 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1197 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1198
1199 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1200 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1201 return i;
1202}
1203
1204/*
84382d49
MD
1205 * Poll on the should_quit pipe and the command socket return -1 on
1206 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1207 */
1208int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1209{
1210 int num_rdy;
1211
88f2b785 1212restart:
3bd1e081
MD
1213 num_rdy = poll(consumer_sockpoll, 2, -1);
1214 if (num_rdy == -1) {
88f2b785
MD
1215 /*
1216 * Restart interrupted system call.
1217 */
1218 if (errno == EINTR) {
1219 goto restart;
1220 }
7a57cf92 1221 PERROR("Poll error");
84382d49 1222 return -1;
3bd1e081 1223 }
509bb1cf 1224 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1225 DBG("consumer_should_quit wake up");
84382d49 1226 return 1;
3bd1e081
MD
1227 }
1228 return 0;
3bd1e081
MD
1229}
1230
1231/*
1232 * Set the error socket.
1233 */
28ab034a 1234void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
3bd1e081
MD
1235{
1236 ctx->consumer_error_socket = sock;
1237}
1238
1239/*
1240 * Set the command socket path.
1241 */
28ab034a 1242void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
3bd1e081
MD
1243{
1244 ctx->consumer_command_sock_path = sock;
1245}
1246
1247/*
1248 * Send return code to the session daemon.
1249 * If the socket is not defined, we return 0, it is not a fatal error
1250 */
ffe60014 1251int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1252{
1253 if (ctx->consumer_error_socket > 0) {
28ab034a
JG
1254 return lttcomm_send_unix_sock(
1255 ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
3bd1e081
MD
1256 }
1257
1258 return 0;
1259}
1260
1261/*
228b5bf7
DG
1262 * Close all the tracefiles and stream fds and MUST be called when all
1263 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081 1264 */
cd9adb8b 1265void lttng_consumer_cleanup()
3bd1e081 1266{
e4421fec 1267 struct lttng_ht_iter iter;
ffe60014 1268 struct lttng_consumer_channel *channel;
e10aec8f 1269 unsigned int trace_chunks_left;
6065ceec 1270
56047f5a
JG
1271 {
1272 lttng::urcu::read_lock_guard read_lock;
3bd1e081 1273
56047f5a
JG
1274 cds_lfht_for_each_entry (
1275 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1276 consumer_del_channel(channel);
1277 }
3bd1e081 1278 }
6065ceec 1279
fa29bfbf
SM
1280 lttng_ht_destroy(the_consumer_data.channel_ht);
1281 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1282
1283 cleanup_relayd_ht();
1284
fa29bfbf 1285 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
d8ef542d 1286
228b5bf7
DG
1287 /*
1288 * This HT contains streams that are freed by either the metadata thread or
1289 * the data thread so we do *nothing* on the hash table and simply destroy
1290 * it.
1291 */
fa29bfbf 1292 lttng_ht_destroy(the_consumer_data.stream_list_ht);
28cc88f3 1293
e10aec8f
MD
1294 /*
1295 * Trace chunks in the registry may still exist if the session
1296 * daemon has encountered an internal error and could not
1297 * tear down its sessions and/or trace chunks properly.
1298 *
1299 * Release the session daemon's implicit reference to any remaining
1300 * trace chunk and print an error if any trace chunk was found. Note
1301 * that there are _no_ legitimate cases for trace chunks to be left,
1302 * it is a leak. However, it can happen following a crash of the
1303 * session daemon and not emptying the registry would cause an assertion
1304 * to hit.
1305 */
28ab034a
JG
1306 trace_chunks_left =
1307 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
e10aec8f
MD
1308 if (trace_chunks_left) {
1309 ERR("%u trace chunks are leaked by lttng-consumerd. "
28ab034a
JG
1310 "This can be caused by an internal error of the session daemon.",
1311 trace_chunks_left);
e10aec8f
MD
1312 }
1313 /* Run all callbacks freeing each chunk. */
1314 rcu_barrier();
fa29bfbf 1315 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
3bd1e081
MD
1316}
1317
1318/*
1319 * Called from signal handler.
1320 */
1321void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1322{
6cd525e8
MD
1323 ssize_t ret;
1324
10211f5c 1325 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1326 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1327 if (ret < 1) {
7a57cf92 1328 PERROR("write consumer quit");
3bd1e081 1329 }
ab1027f4
DG
1330
1331 DBG("Consumer flag that it should quit");
3bd1e081
MD
1332}
1333
5199ffc4
JG
1334/*
1335 * Flush pending writes to trace output disk file.
1336 */
28ab034a 1337static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
3bd1e081 1338{
c7a78aab 1339 int ret;
3bd1e081
MD
1340 int outfd = stream->out_fd;
1341
1342 /*
1343 * This does a blocking write-and-wait on any page that belongs to the
1344 * subbuffer prior to the one we just wrote.
1345 * Don't care about error values, as these are just hints and ways to
1346 * limit the amount of page cache used.
1347 */
ffe60014 1348 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1349 return;
1350 }
28ab034a
JG
1351 lttng_sync_file_range(outfd,
1352 orig_offset - stream->max_sb_size,
1353 stream->max_sb_size,
1354 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
1355 SYNC_FILE_RANGE_WAIT_AFTER);
3bd1e081
MD
1356 /*
1357 * Give hints to the kernel about how we access the file:
1358 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1359 * we write it.
1360 *
1361 * We need to call fadvise again after the file grows because the
1362 * kernel does not seem to apply fadvise to non-existing parts of the
1363 * file.
1364 *
1365 * Call fadvise _after_ having waited for the page writeback to
1366 * complete because the dirty page writeback semantic is not well
1367 * defined. So it can be expected to lead to lower throughput in
1368 * streaming.
1369 */
28ab034a
JG
1370 ret = posix_fadvise(
1371 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
a0d0e127 1372 if (ret && ret != -ENOSYS) {
a74a5f4a
JG
1373 errno = ret;
1374 PERROR("posix_fadvise on fd %i", outfd);
c7a78aab 1375 }
3bd1e081
MD
1376}
1377
1378/*
1379 * Initialise the necessary environnement :
1380 * - create a new context
1381 * - create the poll_pipe
1382 * - create the should_quit pipe (for signal handler)
1383 * - create the thread pipe (for splice)
1384 *
1385 * Takes a function pointer as argument, this function is called when data is
1386 * available on a buffer. This function is responsible to do the
1387 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1388 * buffer configuration and then kernctl_put_next_subbuf at the end.
1389 *
1390 * Returns a pointer to the new context or NULL on error.
1391 */
28ab034a
JG
1392struct lttng_consumer_local_data *
1393lttng_consumer_create(enum lttng_consumer_type type,
1394 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1395 struct lttng_consumer_local_data *ctx,
1396 bool locked_by_caller),
1397 int (*recv_channel)(struct lttng_consumer_channel *channel),
1398 int (*recv_stream)(struct lttng_consumer_stream *stream),
1399 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1400{
d8ef542d 1401 int ret;
3bd1e081
MD
1402 struct lttng_consumer_local_data *ctx;
1403
a0377dfe 1404 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
28ab034a 1405 the_consumer_data.type == type);
fa29bfbf 1406 the_consumer_data.type = type;
3bd1e081 1407
64803277 1408 ctx = zmalloc<lttng_consumer_local_data>();
cd9adb8b 1409 if (ctx == nullptr) {
7a57cf92 1410 PERROR("allocating context");
3bd1e081
MD
1411 goto error;
1412 }
1413
1414 ctx->consumer_error_socket = -1;
331744e3 1415 ctx->consumer_metadata_socket = -1;
cd9adb8b 1416 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
3bd1e081
MD
1417 /* assign the callbacks */
1418 ctx->on_buffer_ready = buffer_ready;
1419 ctx->on_recv_channel = recv_channel;
1420 ctx->on_recv_stream = recv_stream;
1421 ctx->on_update_stream = update_stream;
1422
acdb9057
DG
1423 ctx->consumer_data_pipe = lttng_pipe_open(0);
1424 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1425 goto error_poll_pipe;
1426 }
1427
02b3d176
DG
1428 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1429 if (!ctx->consumer_wakeup_pipe) {
1430 goto error_wakeup_pipe;
1431 }
1432
3bd1e081
MD
1433 ret = pipe(ctx->consumer_should_quit);
1434 if (ret < 0) {
7a57cf92 1435 PERROR("Error creating recv pipe");
3bd1e081
MD
1436 goto error_quit_pipe;
1437 }
1438
d8ef542d
MD
1439 ret = pipe(ctx->consumer_channel_pipe);
1440 if (ret < 0) {
1441 PERROR("Error creating channel pipe");
1442 goto error_channel_pipe;
1443 }
1444
13886d2d
DG
1445 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1446 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1447 goto error_metadata_pipe;
1448 }
3bd1e081 1449
e9404c27
JG
1450 ctx->channel_monitor_pipe = -1;
1451
fb3a43a9 1452 return ctx;
3bd1e081 1453
fb3a43a9 1454error_metadata_pipe:
d8ef542d
MD
1455 utils_close_pipe(ctx->consumer_channel_pipe);
1456error_channel_pipe:
d8ef542d 1457 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1458error_quit_pipe:
02b3d176
DG
1459 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1460error_wakeup_pipe:
acdb9057 1461 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1462error_poll_pipe:
1463 free(ctx);
1464error:
cd9adb8b 1465 return nullptr;
3bd1e081
MD
1466}
1467
282dadbc
MD
1468/*
1469 * Iterate over all streams of the hashtable and free them properly.
1470 */
1471static void destroy_data_stream_ht(struct lttng_ht *ht)
1472{
1473 struct lttng_ht_iter iter;
1474 struct lttng_consumer_stream *stream;
1475
cd9adb8b 1476 if (ht == nullptr) {
282dadbc
MD
1477 return;
1478 }
1479
56047f5a
JG
1480 {
1481 lttng::urcu::read_lock_guard read_lock;
1482 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1483 /*
1484 * Ignore return value since we are currently cleaning up so any error
1485 * can't be handled.
1486 */
1487 (void) consumer_del_stream(stream, ht);
1488 }
282dadbc 1489 }
282dadbc
MD
1490
1491 lttng_ht_destroy(ht);
1492}
1493
1494/*
1495 * Iterate over all streams of the metadata hashtable and free them
1496 * properly.
1497 */
1498static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1499{
1500 struct lttng_ht_iter iter;
1501 struct lttng_consumer_stream *stream;
1502
cd9adb8b 1503 if (ht == nullptr) {
282dadbc
MD
1504 return;
1505 }
1506
56047f5a
JG
1507 {
1508 lttng::urcu::read_lock_guard read_lock;
1509 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1510 /*
1511 * Ignore return value since we are currently cleaning up so any error
1512 * can't be handled.
1513 */
1514 (void) consumer_del_metadata_stream(stream, ht);
1515 }
282dadbc 1516 }
282dadbc
MD
1517
1518 lttng_ht_destroy(ht);
1519}
1520
3bd1e081
MD
1521/*
1522 * Close all fds associated with the instance and free the context.
1523 */
1524void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1525{
4c462e79
MD
1526 int ret;
1527
ab1027f4
DG
1528 DBG("Consumer destroying it. Closing everything.");
1529
4f2e75b9
DG
1530 if (!ctx) {
1531 return;
1532 }
1533
282dadbc
MD
1534 destroy_data_stream_ht(data_ht);
1535 destroy_metadata_stream_ht(metadata_ht);
1536
4c462e79
MD
1537 ret = close(ctx->consumer_error_socket);
1538 if (ret) {
1539 PERROR("close");
1540 }
331744e3
JD
1541 ret = close(ctx->consumer_metadata_socket);
1542 if (ret) {
1543 PERROR("close");
1544 }
d8ef542d 1545 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1546 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1547 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1548 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1549 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1550
3bd1e081
MD
1551 unlink(ctx->consumer_command_sock_path);
1552 free(ctx);
1553}
1554
6197aea7
DG
1555/*
1556 * Write the metadata stream id on the specified file descriptor.
1557 */
28ab034a
JG
1558static int
1559write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
6197aea7 1560{
6cd525e8 1561 ssize_t ret;
1d4dfdef 1562 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1563
1d4dfdef
DG
1564 hdr.stream_id = htobe64(stream->relayd_stream_id);
1565 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1566 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1567 if (ret < sizeof(hdr)) {
d7b75ec8 1568 /*
6f04ed72 1569 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1570 * not to clubber the error output since this can happen in a normal
1571 * code path.
1572 */
1573 if (errno != EPIPE) {
1574 PERROR("write metadata stream id");
1575 }
1576 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1577 /*
1578 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1579 * handle writting the missing part so report that as an error and
1580 * don't lie to the caller.
1581 */
1582 ret = -1;
6197aea7
DG
1583 goto end;
1584 }
1d4dfdef 1585 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
28ab034a
JG
1586 stream->relayd_stream_id,
1587 padding);
6197aea7
DG
1588
1589end:
6cd525e8 1590 return (int) ret;
6197aea7
DG
1591}
1592
3bd1e081 1593/*
09e26845
DG
1594 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1595 * core function for writing trace buffers to either the local filesystem or
1596 * the network.
1597 *
d2956687 1598 * It must be called with the stream and the channel lock held.
79d4ffb7 1599 *
09e26845 1600 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1601 *
1602 * Returns the number of bytes written
1603 */
28ab034a
JG
1604ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1605 const struct lttng_buffer_view *buffer,
1606 unsigned long padding)
3bd1e081 1607{
994ab360 1608 ssize_t ret = 0;
f02e1e8a
DG
1609 off_t orig_offset = stream->out_fd_offset;
1610 /* Default is on the disk */
1611 int outfd = stream->out_fd;
cd9adb8b 1612 struct consumer_relayd_sock_pair *relayd = nullptr;
8994307f 1613 unsigned int relayd_hang_up = 0;
fd424d99
JG
1614 const size_t subbuf_content_size = buffer->size - padding;
1615 size_t write_len;
f02e1e8a
DG
1616
1617 /* RCU lock for the relayd pointer */
56047f5a 1618 lttng::urcu::read_lock_guard read_lock;
28ab034a 1619 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
d2956687 1620
f02e1e8a 1621 /* Flag that the current stream if set for network streaming. */
da009f2c 1622 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1623 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1624 if (relayd == nullptr) {
56591bac 1625 ret = -EPIPE;
f02e1e8a
DG
1626 goto end;
1627 }
1628 }
1629
f02e1e8a
DG
1630 /* Handle stream on the relayd if the output is on the network */
1631 if (relayd) {
fd424d99 1632 unsigned long netlen = subbuf_content_size;
f02e1e8a
DG
1633
1634 /*
1635 * Lock the control socket for the complete duration of the function
1636 * since from this point on we will use the socket.
1637 */
1638 if (stream->metadata_flag) {
1639 /* Metadata requires the control socket. */
1640 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1641 if (stream->reset_metadata_flag) {
1642 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1643 stream->relayd_stream_id,
1644 stream->metadata_version);
93ec662e
JD
1645 if (ret < 0) {
1646 relayd_hang_up = 1;
1647 goto write_error;
1648 }
1649 stream->reset_metadata_flag = 0;
1650 }
1d4dfdef 1651 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1652 }
1653
1d4dfdef 1654 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1655 if (ret < 0) {
1656 relayd_hang_up = 1;
1657 goto write_error;
1658 }
1659 /* Use the returned socket. */
1660 outfd = ret;
f02e1e8a 1661
994ab360
DG
1662 /* Write metadata stream id before payload */
1663 if (stream->metadata_flag) {
239f61af 1664 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1665 if (ret < 0) {
8994307f
DG
1666 relayd_hang_up = 1;
1667 goto write_error;
1668 }
f02e1e8a 1669 }
1624d5b7 1670
fd424d99
JG
1671 write_len = subbuf_content_size;
1672 } else {
1673 /* No streaming; we have to write the full padding. */
93ec662e
JD
1674 if (stream->metadata_flag && stream->reset_metadata_flag) {
1675 ret = utils_truncate_stream_file(stream->out_fd, 0);
1676 if (ret < 0) {
1677 ERR("Reset metadata file");
1678 goto end;
1679 }
1680 stream->reset_metadata_flag = 0;
1681 }
1682
1624d5b7
JD
1683 /*
1684 * Check if we need to change the tracefile before writing the packet.
1685 */
1686 if (stream->chan->tracefile_size > 0 &&
28ab034a
JG
1687 (stream->tracefile_size_current + buffer->size) >
1688 stream->chan->tracefile_size) {
d2956687
JG
1689 ret = consumer_stream_rotate_output_files(stream);
1690 if (ret) {
1624d5b7
JD
1691 goto end;
1692 }
309167d2 1693 outfd = stream->out_fd;
a1ae300f 1694 orig_offset = 0;
1624d5b7 1695 }
fd424d99 1696 stream->tracefile_size_current += buffer->size;
fd424d99 1697 write_len = buffer->size;
f02e1e8a
DG
1698 }
1699
d02b8372
DG
1700 /*
1701 * This call guarantee that len or less is returned. It's impossible to
1702 * receive a ret value that is bigger than len.
1703 */
fd424d99 1704 ret = lttng_write(outfd, buffer->data, write_len);
e2d1190b 1705 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
fd424d99 1706 if (ret < 0 || ((size_t) ret != write_len)) {
d02b8372
DG
1707 /*
1708 * Report error to caller if nothing was written else at least send the
1709 * amount written.
1710 */
1711 if (ret < 0) {
994ab360 1712 ret = -errno;
f02e1e8a 1713 }
994ab360 1714 relayd_hang_up = 1;
f02e1e8a 1715
d02b8372 1716 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1717 if (errno == EPIPE) {
d02b8372
DG
1718 /*
1719 * This is possible if the fd is closed on the other side
1720 * (outfd) or any write problem. It can be verbose a bit for a
1721 * normal execution if for instance the relayd is stopped
1722 * abruptly. This can happen so set this to a DBG statement.
1723 */
1724 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1725 } else {
1726 /* Unhandled error, print it and stop function right now. */
28ab034a 1727 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
f02e1e8a 1728 }
994ab360 1729 goto write_error;
d02b8372
DG
1730 }
1731 stream->output_written += ret;
d02b8372
DG
1732
1733 /* This call is useless on a socket so better save a syscall. */
1734 if (!relayd) {
1735 /* This won't block, but will start writeout asynchronously */
28ab034a
JG
1736 lttng_sync_file_range(
1737 outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
fd424d99 1738 stream->out_fd_offset += write_len;
f5dbe415 1739 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1740 }
f02e1e8a 1741
8994307f
DG
1742write_error:
1743 /*
1744 * This is a special case that the relayd has closed its socket. Let's
1745 * cleanup the relayd object and all associated streams.
1746 */
1747 if (relayd && relayd_hang_up) {
28ab034a 1748 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1749 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1750 }
1751
f02e1e8a
DG
1752end:
1753 /* Unlock only if ctrl socket used */
1754 if (relayd && stream->metadata_flag) {
1755 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1756 }
1757
994ab360 1758 return ret;
3bd1e081
MD
1759}
1760
1761/*
1762 * Splice the data from the ring buffer to the tracefile.
1763 *
79d4ffb7
DG
1764 * It must be called with the stream lock held.
1765 *
3bd1e081
MD
1766 * Returns the number of bytes spliced.
1767 */
28ab034a
JG
1768ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1769 struct lttng_consumer_stream *stream,
1770 unsigned long len,
1771 unsigned long padding)
3bd1e081 1772{
f02e1e8a
DG
1773 ssize_t ret = 0, written = 0, ret_splice = 0;
1774 loff_t offset = 0;
1775 off_t orig_offset = stream->out_fd_offset;
1776 int fd = stream->wait_fd;
1777 /* Default is on the disk */
1778 int outfd = stream->out_fd;
cd9adb8b 1779 struct consumer_relayd_sock_pair *relayd = nullptr;
fb3a43a9 1780 int *splice_pipe;
8994307f 1781 unsigned int relayd_hang_up = 0;
f02e1e8a 1782
fa29bfbf 1783 switch (the_consumer_data.type) {
3bd1e081 1784 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1785 break;
7753dea8
MD
1786 case LTTNG_CONSUMER32_UST:
1787 case LTTNG_CONSUMER64_UST:
f02e1e8a 1788 /* Not supported for user space tracing */
3bd1e081
MD
1789 return -ENOSYS;
1790 default:
1791 ERR("Unknown consumer_data type");
a0377dfe 1792 abort();
3bd1e081
MD
1793 }
1794
f02e1e8a 1795 /* RCU lock for the relayd pointer */
56047f5a 1796 lttng::urcu::read_lock_guard read_lock;
f02e1e8a
DG
1797
1798 /* Flag that the current stream if set for network streaming. */
da009f2c 1799 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1800 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1801 if (relayd == nullptr) {
ad0b0d23 1802 written = -ret;
f02e1e8a
DG
1803 goto end;
1804 }
1805 }
a2361a61 1806 splice_pipe = stream->splice_pipe;
fb3a43a9 1807
f02e1e8a 1808 /* Write metadata stream id before payload */
1d4dfdef 1809 if (relayd) {
ad0b0d23 1810 unsigned long total_len = len;
f02e1e8a 1811
1d4dfdef
DG
1812 if (stream->metadata_flag) {
1813 /*
1814 * Lock the control socket for the complete duration of the function
1815 * since from this point on we will use the socket.
1816 */
1817 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1818
93ec662e
JD
1819 if (stream->reset_metadata_flag) {
1820 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1821 stream->relayd_stream_id,
1822 stream->metadata_version);
93ec662e
JD
1823 if (ret < 0) {
1824 relayd_hang_up = 1;
1825 goto write_error;
1826 }
1827 stream->reset_metadata_flag = 0;
1828 }
28ab034a 1829 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1d4dfdef
DG
1830 if (ret < 0) {
1831 written = ret;
ad0b0d23
DG
1832 relayd_hang_up = 1;
1833 goto write_error;
1d4dfdef
DG
1834 }
1835
1836 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1837 }
1838
1839 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1840 if (ret < 0) {
1841 written = ret;
1842 relayd_hang_up = 1;
1843 goto write_error;
f02e1e8a 1844 }
ad0b0d23
DG
1845 /* Use the returned socket. */
1846 outfd = ret;
1d4dfdef
DG
1847 } else {
1848 /* No streaming, we have to set the len with the full padding */
1849 len += padding;
1624d5b7 1850
93ec662e
JD
1851 if (stream->metadata_flag && stream->reset_metadata_flag) {
1852 ret = utils_truncate_stream_file(stream->out_fd, 0);
1853 if (ret < 0) {
1854 ERR("Reset metadata file");
1855 goto end;
1856 }
1857 stream->reset_metadata_flag = 0;
1858 }
1624d5b7
JD
1859 /*
1860 * Check if we need to change the tracefile before writing the packet.
1861 */
1862 if (stream->chan->tracefile_size > 0 &&
28ab034a 1863 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
d2956687 1864 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1865 if (ret < 0) {
ad0b0d23 1866 written = ret;
1624d5b7
JD
1867 goto end;
1868 }
309167d2 1869 outfd = stream->out_fd;
a1ae300f 1870 orig_offset = 0;
1624d5b7
JD
1871 }
1872 stream->tracefile_size_current += len;
f02e1e8a
DG
1873 }
1874
1875 while (len > 0) {
1d4dfdef 1876 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
28ab034a
JG
1877 (unsigned long) offset,
1878 len,
1879 fd,
1880 splice_pipe[1]);
1881 ret_splice = splice(
cd9adb8b 1882 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
f02e1e8a
DG
1883 DBG("splice chan to pipe, ret %zd", ret_splice);
1884 if (ret_splice < 0) {
d02b8372 1885 ret = errno;
ad0b0d23 1886 written = -ret;
d02b8372 1887 PERROR("Error in relay splice");
f02e1e8a
DG
1888 goto splice_error;
1889 }
1890
1891 /* Handle stream on the relayd if the output is on the network */
ad0b0d23
DG
1892 if (relayd && stream->metadata_flag) {
1893 size_t metadata_payload_size =
1894 sizeof(struct lttcomm_relayd_metadata_payload);
1895
1896 /* Update counter to fit the spliced data */
1897 ret_splice += metadata_payload_size;
1898 len += metadata_payload_size;
1899 /*
1900 * We do this so the return value can match the len passed as
1901 * argument to this function.
1902 */
1903 written -= metadata_payload_size;
f02e1e8a
DG
1904 }
1905
1906 /* Splice data out */
28ab034a 1907 ret_splice = splice(splice_pipe[0],
cd9adb8b 1908 nullptr,
28ab034a 1909 outfd,
cd9adb8b 1910 nullptr,
28ab034a
JG
1911 ret_splice,
1912 SPLICE_F_MOVE | SPLICE_F_MORE);
1913 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
f02e1e8a 1914 if (ret_splice < 0) {
d02b8372 1915 ret = errno;
ad0b0d23
DG
1916 written = -ret;
1917 relayd_hang_up = 1;
1918 goto write_error;
f02e1e8a 1919 } else if (ret_splice > len) {
d02b8372
DG
1920 /*
1921 * We don't expect this code path to be executed but you never know
1922 * so this is an extra protection agains a buggy splice().
1923 */
f02e1e8a 1924 ret = errno;
ad0b0d23 1925 written += ret_splice;
28ab034a 1926 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
f02e1e8a 1927 goto splice_error;
d02b8372
DG
1928 } else {
1929 /* All good, update current len and continue. */
1930 len -= ret_splice;
f02e1e8a 1931 }
f02e1e8a
DG
1932
1933 /* This call is useless on a socket so better save a syscall. */
1934 if (!relayd) {
1935 /* This won't block, but will start writeout asynchronously */
28ab034a
JG
1936 lttng_sync_file_range(
1937 outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
f02e1e8a
DG
1938 stream->out_fd_offset += ret_splice;
1939 }
e5d1a9b3 1940 stream->output_written += ret_splice;
f02e1e8a
DG
1941 written += ret_splice;
1942 }
f5dbe415
JG
1943 if (!relayd) {
1944 lttng_consumer_sync_trace_file(stream, orig_offset);
1945 }
f02e1e8a
DG
1946 goto end;
1947
8994307f
DG
1948write_error:
1949 /*
1950 * This is a special case that the relayd has closed its socket. Let's
1951 * cleanup the relayd object and all associated streams.
1952 */
1953 if (relayd && relayd_hang_up) {
28ab034a 1954 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1955 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1956 /* Skip splice error so the consumer does not fail */
1957 goto end;
1958 }
1959
f02e1e8a
DG
1960splice_error:
1961 /* send the appropriate error description to sessiond */
1962 switch (ret) {
f02e1e8a 1963 case EINVAL:
f73fabfd 1964 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1965 break;
1966 case ENOMEM:
f73fabfd 1967 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1968 break;
1969 case ESPIPE:
f73fabfd 1970 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1971 break;
1972 }
1973
1974end:
1975 if (relayd && stream->metadata_flag) {
1976 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1977 }
1978
f02e1e8a 1979 return written;
3bd1e081
MD
1980}
1981
15055ce5
JD
1982/*
1983 * Sample the snapshot positions for a specific fd
1984 *
1985 * Returns 0 on success, < 0 on error
1986 */
1987int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1988{
fa29bfbf 1989 switch (the_consumer_data.type) {
15055ce5
JD
1990 case LTTNG_CONSUMER_KERNEL:
1991 return lttng_kconsumer_sample_snapshot_positions(stream);
1992 case LTTNG_CONSUMER32_UST:
1993 case LTTNG_CONSUMER64_UST:
1994 return lttng_ustconsumer_sample_snapshot_positions(stream);
1995 default:
1996 ERR("Unknown consumer_data type");
a0377dfe 1997 abort();
15055ce5
JD
1998 return -ENOSYS;
1999 }
2000}
3bd1e081
MD
2001/*
2002 * Take a snapshot for a specific fd
2003 *
2004 * Returns 0 on success, < 0 on error
2005 */
ffe60014 2006int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 2007{
fa29bfbf 2008 switch (the_consumer_data.type) {
3bd1e081 2009 case LTTNG_CONSUMER_KERNEL:
ffe60014 2010 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
2011 case LTTNG_CONSUMER32_UST:
2012 case LTTNG_CONSUMER64_UST:
ffe60014 2013 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
2014 default:
2015 ERR("Unknown consumer_data type");
a0377dfe 2016 abort();
3bd1e081
MD
2017 return -ENOSYS;
2018 }
3bd1e081
MD
2019}
2020
2021/*
2022 * Get the produced position
2023 *
2024 * Returns 0 on success, < 0 on error
2025 */
28ab034a 2026int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 2027{
fa29bfbf 2028 switch (the_consumer_data.type) {
3bd1e081 2029 case LTTNG_CONSUMER_KERNEL:
ffe60014 2030 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
2031 case LTTNG_CONSUMER32_UST:
2032 case LTTNG_CONSUMER64_UST:
ffe60014 2033 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2034 default:
2035 ERR("Unknown consumer_data type");
a0377dfe 2036 abort();
3bd1e081
MD
2037 return -ENOSYS;
2038 }
2039}
2040
15055ce5
JD
2041/*
2042 * Get the consumed position (free-running counter position in bytes).
2043 *
2044 * Returns 0 on success, < 0 on error
2045 */
28ab034a 2046int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
15055ce5 2047{
fa29bfbf 2048 switch (the_consumer_data.type) {
15055ce5
JD
2049 case LTTNG_CONSUMER_KERNEL:
2050 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2051 case LTTNG_CONSUMER32_UST:
2052 case LTTNG_CONSUMER64_UST:
2053 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2054 default:
2055 ERR("Unknown consumer_data type");
a0377dfe 2056 abort();
15055ce5
JD
2057 return -ENOSYS;
2058 }
2059}
2060
3bd1e081 2061int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
2062 int sock,
2063 struct pollfd *consumer_sockpoll)
3bd1e081 2064{
fa29bfbf 2065 switch (the_consumer_data.type) {
3bd1e081
MD
2066 case LTTNG_CONSUMER_KERNEL:
2067 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2068 case LTTNG_CONSUMER32_UST:
2069 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2070 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2071 default:
2072 ERR("Unknown consumer_data type");
a0377dfe 2073 abort();
3bd1e081
MD
2074 return -ENOSYS;
2075 }
2076}
2077
cd9adb8b 2078static void lttng_consumer_close_all_metadata()
d88aee68 2079{
fa29bfbf 2080 switch (the_consumer_data.type) {
d88aee68
DG
2081 case LTTNG_CONSUMER_KERNEL:
2082 /*
2083 * The Kernel consumer has a different metadata scheme so we don't
2084 * close anything because the stream will be closed by the session
2085 * daemon.
2086 */
2087 break;
2088 case LTTNG_CONSUMER32_UST:
2089 case LTTNG_CONSUMER64_UST:
2090 /*
2091 * Close all metadata streams. The metadata hash table is passed and
2092 * this call iterates over it by closing all wakeup fd. This is safe
2093 * because at this point we are sure that the metadata producer is
2094 * either dead or blocked.
2095 */
6d574024 2096 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2097 break;
2098 default:
2099 ERR("Unknown consumer_data type");
a0377dfe 2100 abort();
d88aee68
DG
2101 }
2102}
2103
fb3a43a9
DG
2104/*
2105 * Clean up a metadata stream and free its memory.
2106 */
28ab034a 2107void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
fb3a43a9 2108{
cd9adb8b 2109 struct lttng_consumer_channel *channel = nullptr;
a6ef8ee6 2110 bool free_channel = false;
fb3a43a9 2111
a0377dfe 2112 LTTNG_ASSERT(stream);
fb3a43a9
DG
2113 /*
2114 * This call should NEVER receive regular stream. It must always be
2115 * metadata stream and this is crucial for data structure synchronization.
2116 */
a0377dfe 2117 LTTNG_ASSERT(stream->metadata_flag);
fb3a43a9 2118
e316aad5
DG
2119 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2120
fa29bfbf 2121 pthread_mutex_lock(&the_consumer_data.lock);
a6ef8ee6
JG
2122 /*
2123 * Note that this assumes that a stream's channel is never changed and
2124 * that the stream's lock doesn't need to be taken to sample its
2125 * channel.
2126 */
2127 channel = stream->chan;
2128 pthread_mutex_lock(&channel->lock);
3dad2c0f 2129 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2130 if (channel->metadata_cache) {
081424af 2131 /* Only applicable to userspace consumers. */
a6ef8ee6 2132 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2133 }
8994307f 2134
6d574024
DG
2135 /* Remove any reference to that stream. */
2136 consumer_stream_delete(stream, ht);
ca22feea 2137
6d574024 2138 /* Close down everything including the relayd if one. */
d119bd01 2139 consumer_stream_close_output(stream);
6d574024
DG
2140 /* Destroy tracer buffers of the stream. */
2141 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2142
2143 /* Atomically decrement channel refcount since other threads can use it. */
28ab034a
JG
2144 if (!uatomic_sub_return(&channel->refcount, 1) &&
2145 !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2146 /* Go for channel deletion! */
a6ef8ee6 2147 free_channel = true;
fb3a43a9 2148 }
cd9adb8b 2149 stream->chan = nullptr;
fb3a43a9 2150
73811ecc
DG
2151 /*
2152 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2153 * channel lock MUST be acquired before being able to check for a NULL
2154 * pointer value.
73811ecc 2155 */
cd9adb8b 2156 channel->metadata_stream = nullptr;
73811ecc 2157
a6ef8ee6
JG
2158 if (channel->metadata_cache) {
2159 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2160 }
3dad2c0f 2161 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2162 pthread_mutex_unlock(&channel->lock);
fa29bfbf 2163 pthread_mutex_unlock(&the_consumer_data.lock);
e316aad5 2164
a6ef8ee6
JG
2165 if (free_channel) {
2166 consumer_del_channel(channel);
e316aad5
DG
2167 }
2168
d2956687 2169 lttng_trace_chunk_put(stream->trace_chunk);
cd9adb8b 2170 stream->trace_chunk = nullptr;
6d574024 2171 consumer_stream_free(stream);
fb3a43a9
DG
2172}
2173
2174/*
2175 * Action done with the metadata stream when adding it to the consumer internal
2176 * data structures to handle it.
2177 */
66d583dc 2178void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2179{
5ab66908 2180 struct lttng_ht *ht = metadata_ht;
76082088 2181 struct lttng_ht_iter iter;
d88aee68 2182 struct lttng_ht_node_u64 *node;
fb3a43a9 2183
a0377dfe
FD
2184 LTTNG_ASSERT(stream);
2185 LTTNG_ASSERT(ht);
e316aad5 2186
d88aee68 2187 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5 2188
fa29bfbf 2189 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 2190 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2191 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2192 pthread_mutex_lock(&stream->lock);
e316aad5 2193
e316aad5
DG
2194 /*
2195 * From here, refcounts are updated so be _careful_ when returning an error
2196 * after this point.
2197 */
2198
56047f5a 2199 lttng::urcu::read_lock_guard read_lock;
76082088
DG
2200
2201 /*
2202 * Lookup the stream just to make sure it does not exist in our internal
2203 * state. This should NEVER happen.
2204 */
d88aee68
DG
2205 lttng_ht_lookup(ht, &stream->key, &iter);
2206 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2207 LTTNG_ASSERT(!node);
76082088 2208
e316aad5 2209 /*
ffe60014
DG
2210 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2211 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2212 * causes the count to become 0 also causes a stream to be added. The
2213 * channel deletion will thus be triggered by the following removal of this
2214 * stream.
2215 */
ffe60014 2216 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2217 /* Increment refcount before decrementing nb_init_stream_left */
2218 cmm_smp_wmb();
ffe60014 2219 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2220 }
2221
d88aee68 2222 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2223
28ab034a 2224 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 2225
ca22feea
DG
2226 /*
2227 * Add stream to the stream_list_ht of the consumer data. No need to steal
2228 * the key since the HT does not use it and we allow to add redundant keys
2229 * into this table.
2230 */
28ab034a 2231 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2232
2e818a6a 2233 pthread_mutex_unlock(&stream->lock);
a9838785 2234 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2235 pthread_mutex_unlock(&stream->chan->timer_lock);
fa29bfbf 2236 pthread_mutex_unlock(&the_consumer_data.lock);
fb3a43a9
DG
2237}
2238
8994307f
DG
2239/*
2240 * Delete data stream that are flagged for deletion (endpoint_status).
2241 */
cd9adb8b 2242static void validate_endpoint_status_data_stream()
8994307f
DG
2243{
2244 struct lttng_ht_iter iter;
2245 struct lttng_consumer_stream *stream;
2246
2247 DBG("Consumer delete flagged data stream");
2248
56047f5a
JG
2249 {
2250 lttng::urcu::read_lock_guard read_lock;
2251
2252 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2253 /* Validate delete flag of the stream */
2254 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2255 continue;
2256 }
2257 /* Delete it right now */
2258 consumer_del_stream(stream, data_ht);
8994307f 2259 }
8994307f 2260 }
8994307f
DG
2261}
2262
2263/*
2264 * Delete metadata stream that are flagged for deletion (endpoint_status).
2265 */
28ab034a 2266static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
8994307f
DG
2267{
2268 struct lttng_ht_iter iter;
2269 struct lttng_consumer_stream *stream;
2270
2271 DBG("Consumer delete flagged metadata stream");
2272
a0377dfe 2273 LTTNG_ASSERT(pollset);
8994307f 2274
56047f5a
JG
2275 {
2276 lttng::urcu::read_lock_guard read_lock;
2277 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2278 /* Validate delete flag of the stream */
2279 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2280 continue;
2281 }
2282 /*
2283 * Remove from pollset so the metadata thread can continue without
2284 * blocking on a deleted stream.
2285 */
2286 lttng_poll_del(pollset, stream->wait_fd);
8994307f 2287
56047f5a
JG
2288 /* Delete it right now */
2289 consumer_del_metadata_stream(stream, metadata_ht);
2290 }
8994307f 2291 }
8994307f
DG
2292}
2293
fb3a43a9
DG
2294/*
2295 * Thread polls on metadata file descriptor and write them on disk or on the
2296 * network.
2297 */
7d980def 2298void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2299{
1fc79fb4 2300 int ret, i, pollfd, err = -1;
fb3a43a9 2301 uint32_t revents, nb_fd;
cd9adb8b 2302 struct lttng_consumer_stream *stream = nullptr;
fb3a43a9 2303 struct lttng_ht_iter iter;
d88aee68 2304 struct lttng_ht_node_u64 *node;
fb3a43a9 2305 struct lttng_poll_event events;
97535efa 2306 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
fb3a43a9
DG
2307 ssize_t len;
2308
2309 rcu_register_thread();
2310
1fc79fb4
MD
2311 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2312
2d57de81
MD
2313 if (testpoint(consumerd_thread_metadata)) {
2314 goto error_testpoint;
2315 }
2316
9ce5646a
MD
2317 health_code_update();
2318
fb3a43a9
DG
2319 DBG("Thread metadata poll started");
2320
fb3a43a9
DG
2321 /* Size is set to 1 for the consumer_metadata pipe */
2322 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2323 if (ret < 0) {
2324 ERR("Poll set creation failed");
d8ef542d 2325 goto end_poll;
fb3a43a9
DG
2326 }
2327
28ab034a 2328 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2329 if (ret < 0) {
2330 goto end;
2331 }
2332
2333 /* Main loop */
2334 DBG("Metadata main loop started");
2335
cd9adb8b 2336 while (true) {
28ab034a 2337 restart:
7fa2082e 2338 health_code_update();
9ce5646a 2339 health_poll_entry();
7fa2082e 2340 DBG("Metadata poll wait");
fb3a43a9 2341 ret = lttng_poll_wait(&events, -1);
28ab034a 2342 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2343 health_poll_exit();
40063ead 2344 DBG("Metadata event caught in thread");
fb3a43a9
DG
2345 if (ret < 0) {
2346 if (errno == EINTR) {
40063ead 2347 ERR("Poll EINTR caught");
fb3a43a9
DG
2348 goto restart;
2349 }
d9607cd7 2350 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2351 err = 0; /* All is OK */
d9607cd7
MD
2352 }
2353 goto end;
fb3a43a9
DG
2354 }
2355
0d9c5d77
DG
2356 nb_fd = ret;
2357
e316aad5 2358 /* From here, the event is a metadata wait fd */
fb3a43a9 2359 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2360 health_code_update();
2361
fb3a43a9
DG
2362 revents = LTTNG_POLL_GETEV(&events, i);
2363 pollfd = LTTNG_POLL_GETFD(&events, i);
2364
13886d2d 2365 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2366 if (revents & LPOLLIN) {
13886d2d
DG
2367 ssize_t pipe_len;
2368
2369 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
28ab034a 2370 &stream,
5c7248cd
JG
2371 sizeof(stream)); /* NOLINT sizeof
2372 used on a
2373 pointer. */
2374 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2375 pointer. */
03e43155
MD
2376 if (pipe_len < 0) {
2377 PERROR("read metadata stream");
2378 }
fb3a43a9 2379 /*
28ab034a
JG
2380 * Remove the pipe from the poll set and continue
2381 * the loop since their might be data to consume.
fb3a43a9 2382 */
28ab034a
JG
2383 lttng_poll_del(
2384 &events,
2385 lttng_pipe_get_readfd(
2386 ctx->consumer_metadata_pipe));
03e43155 2387 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2388 continue;
2389 }
2390
8994307f 2391 /* A NULL stream means that the state has changed. */
cd9adb8b 2392 if (stream == nullptr) {
8994307f
DG
2393 /* Check for deleted streams. */
2394 validate_endpoint_status_metadata_stream(&events);
3714380f 2395 goto restart;
8994307f
DG
2396 }
2397
fb3a43a9 2398 DBG("Adding metadata stream %d to poll set",
28ab034a 2399 stream->wait_fd);
fb3a43a9 2400
fb3a43a9 2401 /* Add metadata stream to the global poll events list */
28ab034a
JG
2402 lttng_poll_add(
2403 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2404 } else if (revents & (LPOLLERR | LPOLLHUP)) {
03e43155
MD
2405 DBG("Metadata thread pipe hung up");
2406 /*
2407 * Remove the pipe from the poll set and continue the loop
2408 * since their might be data to consume.
2409 */
28ab034a
JG
2410 lttng_poll_del(
2411 &events,
2412 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
03e43155
MD
2413 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2414 continue;
2415 } else {
28ab034a
JG
2416 ERR("Unexpected poll events %u for sock %d",
2417 revents,
2418 pollfd);
03e43155 2419 goto end;
fb3a43a9
DG
2420 }
2421
e316aad5 2422 /* Handle other stream */
fb3a43a9
DG
2423 continue;
2424 }
2425
56047f5a 2426 lttng::urcu::read_lock_guard read_lock;
d88aee68
DG
2427 {
2428 uint64_t tmp_id = (uint64_t) pollfd;
2429
2430 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2431 }
2432 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2433 LTTNG_ASSERT(node);
fb3a43a9 2434
28ab034a 2435 stream = caa_container_of(node, struct lttng_consumer_stream, node);
fb3a43a9 2436
03e43155
MD
2437 if (revents & (LPOLLIN | LPOLLPRI)) {
2438 /* Get the data out of the metadata file descriptor */
2439 DBG("Metadata available on fd %d", pollfd);
a0377dfe 2440 LTTNG_ASSERT(stream->wait_fd == pollfd);
03e43155
MD
2441
2442 do {
2443 health_code_update();
2444
6f9449c2 2445 len = ctx->on_buffer_ready(stream, ctx, false);
03e43155
MD
2446 /*
2447 * We don't check the return value here since if we get
83f4233d 2448 * a negative len, it means an error occurred thus we
03e43155
MD
2449 * simply remove it from the poll set and free the
2450 * stream.
2451 */
2452 } while (len > 0);
2453
2454 /* It's ok to have an unavailable sub-buffer */
2455 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2456 /* Clean up stream from consumer and free it. */
2457 lttng_poll_del(&events, stream->wait_fd);
2458 consumer_del_metadata_stream(stream, metadata_ht);
2459 }
2460 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2461 DBG("Metadata fd %d is hup|err.", pollfd);
fa29bfbf 2462 if (!stream->hangup_flush_done &&
28ab034a
JG
2463 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2464 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
fb3a43a9
DG
2465 DBG("Attempting to flush and consume the UST buffers");
2466 lttng_ustconsumer_on_stream_hangup(stream);
2467
2468 /* We just flushed the stream now read it. */
4bb94b75 2469 do {
9ce5646a
MD
2470 health_code_update();
2471
6f9449c2 2472 len = ctx->on_buffer_ready(stream, ctx, false);
4bb94b75 2473 /*
28ab034a
JG
2474 * We don't check the return value here since if we
2475 * get a negative len, it means an error occurred
2476 * thus we simply remove it from the poll set and
2477 * free the stream.
4bb94b75
DG
2478 */
2479 } while (len > 0);
fb3a43a9
DG
2480 }
2481
fb3a43a9 2482 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2483 /*
2484 * This call update the channel states, closes file descriptors
2485 * and securely free the stream.
2486 */
2487 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2488 } else {
2489 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 2490 goto end;
fb3a43a9 2491 }
e316aad5 2492 /* Release RCU lock for the stream looked up */
fb3a43a9
DG
2493 }
2494 }
2495
1fc79fb4
MD
2496 /* All is OK */
2497 err = 0;
fb3a43a9
DG
2498end:
2499 DBG("Metadata poll thread exiting");
fb3a43a9 2500
d8ef542d
MD
2501 lttng_poll_clean(&events);
2502end_poll:
2d57de81 2503error_testpoint:
1fc79fb4
MD
2504 if (err) {
2505 health_error();
2506 ERR("Health error occurred in %s", __func__);
2507 }
2508 health_unregister(health_consumerd);
fb3a43a9 2509 rcu_unregister_thread();
cd9adb8b 2510 return nullptr;
fb3a43a9
DG
2511}
2512
3bd1e081 2513/*
e4421fec 2514 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2515 * it to tracefile if necessary.
2516 */
7d980def 2517void *consumer_thread_data_poll(void *data)
3bd1e081 2518{
ff930959 2519 int num_rdy, high_prio, ret, i, err = -1;
cd9adb8b 2520 struct pollfd *pollfd = nullptr;
3bd1e081 2521 /* local view of the streams */
cd9adb8b 2522 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
3bd1e081 2523 /* local view of consumer_data.fds_count */
8bdcc002
JG
2524 int nb_fd = 0;
2525 /* 2 for the consumer_data_pipe and wake up pipe */
2526 const int nb_pipes_fd = 2;
9a2fcf78
JD
2527 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2528 int nb_inactive_fd = 0;
97535efa 2529 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
00e2e675 2530 ssize_t len;
3bd1e081 2531
e7b994a3
DG
2532 rcu_register_thread();
2533
1fc79fb4
MD
2534 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2535
2d57de81
MD
2536 if (testpoint(consumerd_thread_data)) {
2537 goto error_testpoint;
2538 }
2539
9ce5646a
MD
2540 health_code_update();
2541
64803277 2542 local_stream = zmalloc<lttng_consumer_stream *>();
cd9adb8b 2543 if (local_stream == nullptr) {
4df6c8cb
MD
2544 PERROR("local_stream malloc");
2545 goto end;
2546 }
3bd1e081 2547
cd9adb8b 2548 while (true) {
9ce5646a
MD
2549 health_code_update();
2550
3bd1e081 2551 high_prio = 0;
3bd1e081
MD
2552
2553 /*
e4421fec 2554 * the fds set has been updated, we need to update our
3bd1e081
MD
2555 * local array as well
2556 */
fa29bfbf
SM
2557 pthread_mutex_lock(&the_consumer_data.lock);
2558 if (the_consumer_data.need_update) {
0e428499 2559 free(pollfd);
cd9adb8b 2560 pollfd = nullptr;
0e428499
DG
2561
2562 free(local_stream);
cd9adb8b 2563 local_stream = nullptr;
3bd1e081 2564
8bdcc002 2565 /* Allocate for all fds */
28ab034a
JG
2566 pollfd =
2567 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2568 if (pollfd == nullptr) {
7a57cf92 2569 PERROR("pollfd malloc");
fa29bfbf 2570 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2571 goto end;
2572 }
2573
28ab034a
JG
2574 local_stream = calloc<lttng_consumer_stream *>(
2575 the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2576 if (local_stream == nullptr) {
7a57cf92 2577 PERROR("local_stream malloc");
fa29bfbf 2578 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2579 goto end;
2580 }
28ab034a
JG
2581 ret = update_poll_array(
2582 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
3bd1e081
MD
2583 if (ret < 0) {
2584 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2585 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
fa29bfbf 2586 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2587 goto end;
2588 }
2589 nb_fd = ret;
fa29bfbf 2590 the_consumer_data.need_update = 0;
3bd1e081 2591 }
fa29bfbf 2592 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081 2593
4078b776 2594 /* No FDs and consumer_quit, consumer_cleanup the thread */
28ab034a
JG
2595 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2596 err = 0; /* All is OK */
4078b776
MD
2597 goto end;
2598 }
3bd1e081 2599 /* poll on the array of fds */
88f2b785 2600 restart:
261de637 2601 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2602 if (testpoint(consumerd_thread_data_poll)) {
2603 goto end;
2604 }
9ce5646a 2605 health_poll_entry();
261de637 2606 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2607 health_poll_exit();
3bd1e081
MD
2608 DBG("poll num_rdy : %d", num_rdy);
2609 if (num_rdy == -1) {
88f2b785
MD
2610 /*
2611 * Restart interrupted system call.
2612 */
2613 if (errno == EINTR) {
2614 goto restart;
2615 }
7a57cf92 2616 PERROR("Poll error");
f73fabfd 2617 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2618 goto end;
2619 } else if (num_rdy == 0) {
2620 DBG("Polling thread timed out");
2621 goto end;
2622 }
2623
80957876
JG
2624 if (caa_unlikely(data_consumption_paused)) {
2625 DBG("Data consumption paused, sleeping...");
2626 sleep(1);
2627 goto restart;
2628 }
2629
3bd1e081 2630 /*
50f8ae69 2631 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2632 * beginning of the loop to update the array. We want to prioritize
2633 * array update over low-priority reads.
3bd1e081 2634 */
509bb1cf 2635 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2636 ssize_t pipe_readlen;
04fdd819 2637
50f8ae69 2638 DBG("consumer_data_pipe wake up");
5c7248cd
JG
2639 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2640 &new_stream,
2641 sizeof(new_stream)); /* NOLINT sizeof used on
2642 a pointer. */
2643 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2644 */
6cd525e8 2645 PERROR("Consumer data pipe");
23f5f35d
DG
2646 /* Continue so we can at least handle the current stream(s). */
2647 continue;
2648 }
c869f647
DG
2649
2650 /*
2651 * If the stream is NULL, just ignore it. It's also possible that
2652 * the sessiond poll thread changed the consumer_quit state and is
2653 * waking us up to test it.
2654 */
cd9adb8b 2655 if (new_stream == nullptr) {
8994307f 2656 validate_endpoint_status_data_stream();
c869f647
DG
2657 continue;
2658 }
2659
c869f647 2660 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2661 continue;
2662 }
2663
02b3d176
DG
2664 /* Handle wakeup pipe. */
2665 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2666 char dummy;
2667 ssize_t pipe_readlen;
2668
28ab034a
JG
2669 pipe_readlen =
2670 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
02b3d176
DG
2671 if (pipe_readlen < 0) {
2672 PERROR("Consumer data wakeup pipe");
2673 }
2674 /* We've been awakened to handle stream(s). */
2675 ctx->has_wakeup = 0;
2676 }
2677
3bd1e081
MD
2678 /* Take care of high priority channels first. */
2679 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2680 health_code_update();
2681
cd9adb8b 2682 if (local_stream[i] == nullptr) {
9617607b
DG
2683 continue;
2684 }
fb3a43a9 2685 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2686 DBG("Urgent read on fd %d", pollfd[i].fd);
2687 high_prio = 1;
6f9449c2 2688 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
d41f73b7 2689 /* it's ok to have an unavailable sub-buffer */
b64403e3 2690 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2691 /* Clean the stream and free it. */
2692 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2693 local_stream[i] = nullptr;
4078b776 2694 } else if (len > 0) {
28ab034a
JG
2695 local_stream[i]->has_data_left_to_be_read_before_teardown =
2696 1;
d41f73b7 2697 }
3bd1e081
MD
2698 }
2699 }
2700
4078b776
MD
2701 /*
2702 * If we read high prio channel in this loop, try again
2703 * for more high prio data.
2704 */
2705 if (high_prio) {
3bd1e081
MD
2706 continue;
2707 }
2708
2709 /* Take care of low priority channels. */
4078b776 2710 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2711 health_code_update();
2712
cd9adb8b 2713 if (local_stream[i] == nullptr) {
9617607b
DG
2714 continue;
2715 }
28ab034a
JG
2716 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2717 local_stream[i]->has_data) {
4078b776 2718 DBG("Normal read on fd %d", pollfd[i].fd);
6f9449c2 2719 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
4078b776 2720 /* it's ok to have an unavailable sub-buffer */
b64403e3 2721 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2722 /* Clean the stream and free it. */
2723 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2724 local_stream[i] = nullptr;
4078b776 2725 } else if (len > 0) {
28ab034a
JG
2726 local_stream[i]->has_data_left_to_be_read_before_teardown =
2727 1;
4078b776
MD
2728 }
2729 }
2730 }
2731
2732 /* Handle hangup and errors */
2733 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2734 health_code_update();
2735
cd9adb8b 2736 if (local_stream[i] == nullptr) {
9617607b
DG
2737 continue;
2738 }
28ab034a
JG
2739 if (!local_stream[i]->hangup_flush_done &&
2740 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2741 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2742 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
4078b776 2743 DBG("fd %d is hup|err|nval. Attempting flush and read.",
28ab034a 2744 pollfd[i].fd);
4078b776
MD
2745 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2746 /* Attempt read again, for the data we just flushed. */
c715ddc9 2747 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
4078b776
MD
2748 }
2749 /*
c715ddc9
JG
2750 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2751 * performed. This type of flush ensures that a new packet is produced no
2752 * matter the consumed/produced positions are.
2753 *
2754 * This, in turn, causes the next pass to see that data available for the
2755 * stream. When we come back here, we can be assured that all available
2756 * data has been consumed and we can finally destroy the stream.
2757 *
4078b776
MD
2758 * If the poll flag is HUP/ERR/NVAL and we have
2759 * read no data in this pass, we can remove the
2760 * stream from its hash table.
2761 */
2762 if ((pollfd[i].revents & POLLHUP)) {
2763 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
c715ddc9 2764 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2765 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2766 local_stream[i] = nullptr;
4078b776
MD
2767 }
2768 } else if (pollfd[i].revents & POLLERR) {
2769 ERR("Error returned in polling fd %d.", pollfd[i].fd);
c715ddc9 2770 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2771 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2772 local_stream[i] = nullptr;
4078b776
MD
2773 }
2774 } else if (pollfd[i].revents & POLLNVAL) {
2775 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
c715ddc9 2776 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2777 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2778 local_stream[i] = nullptr;
3bd1e081
MD
2779 }
2780 }
cd9adb8b 2781 if (local_stream[i] != nullptr) {
c715ddc9 2782 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
9617607b 2783 }
3bd1e081
MD
2784 }
2785 }
1fc79fb4
MD
2786 /* All is OK */
2787 err = 0;
3bd1e081
MD
2788end:
2789 DBG("polling thread exiting");
0e428499
DG
2790 free(pollfd);
2791 free(local_stream);
fb3a43a9
DG
2792
2793 /*
2794 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2795 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2796 * read side of the pipe. If we close them both, epoll_wait strangely does
2797 * not return and could create a endless wait period if the pipe is the
2798 * only tracked fd in the poll set. The thread will take care of closing
2799 * the read side.
fb3a43a9 2800 */
13886d2d 2801 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2802
2d57de81 2803error_testpoint:
1fc79fb4
MD
2804 if (err) {
2805 health_error();
2806 ERR("Health error occurred in %s", __func__);
2807 }
2808 health_unregister(health_consumerd);
2809
e7b994a3 2810 rcu_unregister_thread();
cd9adb8b 2811 return nullptr;
3bd1e081
MD
2812}
2813
d8ef542d
MD
2814/*
2815 * Close wake-up end of each stream belonging to the channel. This will
2816 * allow the poll() on the stream read-side to detect when the
2817 * write-side (application) finally closes them.
2818 */
28ab034a 2819static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
d8ef542d
MD
2820{
2821 struct lttng_ht *ht;
2822 struct lttng_consumer_stream *stream;
2823 struct lttng_ht_iter iter;
2824
fa29bfbf 2825 ht = the_consumer_data.stream_per_chan_id_ht;
d8ef542d 2826
56047f5a 2827 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2828 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
2829 ht->hash_fct(&channel->key, lttng_ht_seed),
2830 ht->match_fct,
2831 &channel->key,
2832 &iter.iter,
2833 stream,
2834 node_channel_id.node)
2835 {
f2ad556d
MD
2836 /*
2837 * Protect against teardown with mutex.
2838 */
2839 pthread_mutex_lock(&stream->lock);
2840 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2841 goto next;
2842 }
fa29bfbf 2843 switch (the_consumer_data.type) {
d8ef542d
MD
2844 case LTTNG_CONSUMER_KERNEL:
2845 break;
2846 case LTTNG_CONSUMER32_UST:
2847 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2848 if (stream->metadata_flag) {
2849 /* Safe and protected by the stream lock. */
2850 lttng_ustconsumer_close_metadata(stream->chan);
2851 } else {
2852 /*
2853 * Note: a mutex is taken internally within
2854 * liblttng-ust-ctl to protect timer wakeup_fd
2855 * use from concurrent close.
2856 */
2857 lttng_ustconsumer_close_stream_wakeup(stream);
2858 }
d8ef542d
MD
2859 break;
2860 default:
2861 ERR("Unknown consumer_data type");
a0377dfe 2862 abort();
d8ef542d 2863 }
f2ad556d
MD
2864 next:
2865 pthread_mutex_unlock(&stream->lock);
d8ef542d 2866 }
d8ef542d
MD
2867}
2868
2869static void destroy_channel_ht(struct lttng_ht *ht)
2870{
2871 struct lttng_ht_iter iter;
2872 struct lttng_consumer_channel *channel;
2873 int ret;
2874
cd9adb8b 2875 if (ht == nullptr) {
d8ef542d
MD
2876 return;
2877 }
2878
56047f5a
JG
2879 {
2880 lttng::urcu::read_lock_guard read_lock;
2881
2882 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2883 ret = lttng_ht_del(ht, &iter);
2884 LTTNG_ASSERT(ret != 0);
2885 }
d8ef542d 2886 }
d8ef542d
MD
2887
2888 lttng_ht_destroy(ht);
2889}
2890
2891/*
2892 * This thread polls the channel fds to detect when they are being
2893 * closed. It closes all related streams if the channel is detected as
2894 * closed. It is currently only used as a shim layer for UST because the
2895 * consumerd needs to keep the per-stream wakeup end of pipes open for
2896 * periodical flush.
2897 */
2898void *consumer_thread_channel_poll(void *data)
2899{
1fc79fb4 2900 int ret, i, pollfd, err = -1;
d8ef542d 2901 uint32_t revents, nb_fd;
cd9adb8b 2902 struct lttng_consumer_channel *chan = nullptr;
d8ef542d
MD
2903 struct lttng_ht_iter iter;
2904 struct lttng_ht_node_u64 *node;
2905 struct lttng_poll_event events;
97535efa 2906 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
d8ef542d
MD
2907 struct lttng_ht *channel_ht;
2908
2909 rcu_register_thread();
2910
1fc79fb4
MD
2911 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2912
2d57de81
MD
2913 if (testpoint(consumerd_thread_channel)) {
2914 goto error_testpoint;
2915 }
2916
9ce5646a
MD
2917 health_code_update();
2918
d8ef542d
MD
2919 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2920 if (!channel_ht) {
2921 /* ENOMEM at this point. Better to bail out. */
2922 goto end_ht;
2923 }
2924
2925 DBG("Thread channel poll started");
2926
2927 /* Size is set to 1 for the consumer_channel pipe */
2928 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2929 if (ret < 0) {
2930 ERR("Poll set creation failed");
2931 goto end_poll;
2932 }
2933
2934 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2935 if (ret < 0) {
2936 goto end;
2937 }
2938
2939 /* Main loop */
2940 DBG("Channel main loop started");
2941
cd9adb8b 2942 while (true) {
28ab034a 2943 restart:
7fa2082e
MD
2944 health_code_update();
2945 DBG("Channel poll wait");
9ce5646a 2946 health_poll_entry();
d8ef542d 2947 ret = lttng_poll_wait(&events, -1);
28ab034a 2948 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2949 health_poll_exit();
40063ead 2950 DBG("Channel event caught in thread");
d8ef542d
MD
2951 if (ret < 0) {
2952 if (errno == EINTR) {
40063ead 2953 ERR("Poll EINTR caught");
d8ef542d
MD
2954 goto restart;
2955 }
d9607cd7 2956 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2957 err = 0; /* All is OK */
d9607cd7 2958 }
d8ef542d
MD
2959 goto end;
2960 }
2961
2962 nb_fd = ret;
2963
2964 /* From here, the event is a channel wait fd */
2965 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2966 health_code_update();
2967
d8ef542d
MD
2968 revents = LTTNG_POLL_GETEV(&events, i);
2969 pollfd = LTTNG_POLL_GETFD(&events, i);
2970
d8ef542d 2971 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 2972 if (revents & LPOLLIN) {
d8ef542d 2973 enum consumer_channel_action action;
a0cbdd2e 2974 uint64_t key;
d8ef542d 2975
a0cbdd2e 2976 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 2977 if (ret <= 0) {
03e43155
MD
2978 if (ret < 0) {
2979 ERR("Error reading channel pipe");
2980 }
28ab034a
JG
2981 lttng_poll_del(&events,
2982 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
2983 continue;
2984 }
2985
2986 switch (action) {
2987 case CONSUMER_CHANNEL_ADD:
56047f5a 2988 {
28ab034a 2989 DBG("Adding channel %d to poll set", chan->wait_fd);
d8ef542d
MD
2990
2991 lttng_ht_node_init_u64(&chan->wait_fd_node,
28ab034a 2992 chan->wait_fd);
56047f5a 2993 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2994 lttng_ht_add_unique_u64(channel_ht,
28ab034a 2995 &chan->wait_fd_node);
d8ef542d 2996 /* Add channel to the global poll events list */
28ab034a
JG
2997 // FIXME: Empty flag on a pipe pollset, this might
2998 // hang on FreeBSD.
1524f98c 2999 lttng_poll_add(&events, chan->wait_fd, 0);
d8ef542d 3000 break;
56047f5a 3001 }
a0cbdd2e
MD
3002 case CONSUMER_CHANNEL_DEL:
3003 {
b4a650f3 3004 /*
28ab034a
JG
3005 * This command should never be called if the
3006 * channel has streams monitored by either the data
3007 * or metadata thread. The consumer only notify this
3008 * thread with a channel del. command if it receives
3009 * a destroy channel command from the session daemon
3010 * that send it if a command prior to the
3011 * GET_CHANNEL failed.
b4a650f3
DG
3012 */
3013
56047f5a 3014 lttng::urcu::read_lock_guard read_lock;
a0cbdd2e
MD
3015 chan = consumer_find_channel(key);
3016 if (!chan) {
28ab034a
JG
3017 ERR("UST consumer get channel key %" PRIu64
3018 " not found for del channel",
3019 key);
a0cbdd2e
MD
3020 break;
3021 }
3022 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 3023 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e 3024 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3025 LTTNG_ASSERT(ret == 0);
a0cbdd2e 3026
fa29bfbf 3027 switch (the_consumer_data.type) {
f2a444f1
DG
3028 case LTTNG_CONSUMER_KERNEL:
3029 break;
3030 case LTTNG_CONSUMER32_UST:
3031 case LTTNG_CONSUMER64_UST:
212d67a2 3032 health_code_update();
28ab034a
JG
3033 /* Destroy streams that might have been left
3034 * in the stream list. */
212d67a2 3035 clean_channel_stream_list(chan);
f2a444f1
DG
3036 break;
3037 default:
3038 ERR("Unknown consumer_data type");
a0377dfe 3039 abort();
f2a444f1
DG
3040 }
3041
a0cbdd2e 3042 /*
28ab034a
JG
3043 * Release our own refcount. Force channel deletion
3044 * even if streams were not initialized.
a0cbdd2e
MD
3045 */
3046 if (!uatomic_sub_return(&chan->refcount, 1)) {
3047 consumer_del_channel(chan);
3048 }
3049 goto restart;
3050 }
d8ef542d
MD
3051 case CONSUMER_CHANNEL_QUIT:
3052 /*
28ab034a
JG
3053 * Remove the pipe from the poll set and continue
3054 * the loop since their might be data to consume.
d8ef542d 3055 */
28ab034a
JG
3056 lttng_poll_del(&events,
3057 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3058 continue;
3059 default:
3060 ERR("Unknown action");
3061 break;
3062 }
03e43155
MD
3063 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3064 DBG("Channel thread pipe hung up");
3065 /*
3066 * Remove the pipe from the poll set and continue the loop
3067 * since their might be data to consume.
3068 */
3069 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3070 continue;
3071 } else {
28ab034a
JG
3072 ERR("Unexpected poll events %u for sock %d",
3073 revents,
3074 pollfd);
03e43155 3075 goto end;
d8ef542d
MD
3076 }
3077
3078 /* Handle other stream */
3079 continue;
3080 }
3081
56047f5a 3082 lttng::urcu::read_lock_guard read_lock;
d8ef542d
MD
3083 {
3084 uint64_t tmp_id = (uint64_t) pollfd;
3085
3086 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3087 }
3088 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 3089 LTTNG_ASSERT(node);
d8ef542d 3090
28ab034a 3091 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
d8ef542d
MD
3092
3093 /* Check for error event */
3094 if (revents & (LPOLLERR | LPOLLHUP)) {
3095 DBG("Channel fd %d is hup|err.", pollfd);
3096
3097 lttng_poll_del(&events, chan->wait_fd);
3098 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3099 LTTNG_ASSERT(ret == 0);
b4a650f3
DG
3100
3101 /*
3102 * This will close the wait fd for each stream associated to
3103 * this channel AND monitored by the data/metadata thread thus
3104 * will be clean by the right thread.
3105 */
d8ef542d 3106 consumer_close_channel_streams(chan);
f2ad556d
MD
3107
3108 /* Release our own refcount */
28ab034a
JG
3109 if (!uatomic_sub_return(&chan->refcount, 1) &&
3110 !uatomic_read(&chan->nb_init_stream_left)) {
f2ad556d
MD
3111 consumer_del_channel(chan);
3112 }
03e43155
MD
3113 } else {
3114 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 3115 goto end;
d8ef542d
MD
3116 }
3117
3118 /* Release RCU lock for the channel looked up */
d8ef542d
MD
3119 }
3120 }
3121
1fc79fb4
MD
3122 /* All is OK */
3123 err = 0;
d8ef542d
MD
3124end:
3125 lttng_poll_clean(&events);
3126end_poll:
3127 destroy_channel_ht(channel_ht);
3128end_ht:
2d57de81 3129error_testpoint:
d8ef542d 3130 DBG("Channel poll thread exiting");
1fc79fb4
MD
3131 if (err) {
3132 health_error();
3133 ERR("Health error occurred in %s", __func__);
3134 }
3135 health_unregister(health_consumerd);
d8ef542d 3136 rcu_unregister_thread();
cd9adb8b 3137 return nullptr;
d8ef542d
MD
3138}
3139
331744e3 3140static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3141 struct pollfd *sockpoll,
3142 int client_socket)
331744e3
JD
3143{
3144 int ret;
3145
a0377dfe
FD
3146 LTTNG_ASSERT(ctx);
3147 LTTNG_ASSERT(sockpoll);
331744e3 3148
84382d49
MD
3149 ret = lttng_consumer_poll_socket(sockpoll);
3150 if (ret) {
331744e3
JD
3151 goto error;
3152 }
3153 DBG("Metadata connection on client_socket");
3154
3155 /* Blocking call, waiting for transmission */
3156 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3157 if (ctx->consumer_metadata_socket < 0) {
3158 WARN("On accept metadata");
3159 ret = -1;
3160 goto error;
3161 }
3162 ret = 0;
3163
3164error:
3165 return ret;
3166}
3167
3bd1e081
MD
3168/*
3169 * This thread listens on the consumerd socket and receives the file
3170 * descriptors from the session daemon.
3171 */
7d980def 3172void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3173{
1fc79fb4 3174 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3175 /*
3176 * structure to poll for incoming data on communication socket avoids
3177 * making blocking sockets.
3178 */
3179 struct pollfd consumer_sockpoll[2];
97535efa 3180 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3bd1e081 3181
e7b994a3
DG
3182 rcu_register_thread();
3183
1fc79fb4
MD
3184 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3185
2d57de81
MD
3186 if (testpoint(consumerd_thread_sessiond)) {
3187 goto error_testpoint;
3188 }
3189
9ce5646a
MD
3190 health_code_update();
3191
3bd1e081
MD
3192 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3193 unlink(ctx->consumer_command_sock_path);
3194 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3195 if (client_socket < 0) {
3196 ERR("Cannot create command socket");
3197 goto end;
3198 }
3199
3200 ret = lttcomm_listen_unix_sock(client_socket);
3201 if (ret < 0) {
3202 goto end;
3203 }
3204
32258573 3205 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3206 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3207 /* return < 0 on error, but == 0 is not fatal */
3208 if (ret < 0) {
32258573 3209 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3210 goto end;
3211 }
3212
3bd1e081
MD
3213 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3214 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3215 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3216 consumer_sockpoll[1].fd = client_socket;
3217 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3218
84382d49
MD
3219 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3220 if (ret) {
3221 if (ret > 0) {
3222 /* should exit */
3223 err = 0;
3224 }
3bd1e081
MD
3225 goto end;
3226 }
3227 DBG("Connection on client_socket");
3228
3229 /* Blocking call, waiting for transmission */
3230 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3231 if (sock < 0) {
3bd1e081
MD
3232 WARN("On accept");
3233 goto end;
3234 }
3bd1e081 3235
331744e3
JD
3236 /*
3237 * Setup metadata socket which is the second socket connection on the
3238 * command unix socket.
3239 */
3240 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3241 if (ret) {
3242 if (ret > 0) {
3243 /* should exit */
3244 err = 0;
3245 }
331744e3
JD
3246 goto end;
3247 }
3248
d96f09c6
DG
3249 /* This socket is not useful anymore. */
3250 ret = close(client_socket);
3251 if (ret < 0) {
3252 PERROR("close client_socket");
3253 }
3254 client_socket = -1;
3255
3bd1e081
MD
3256 /* update the polling structure to poll on the established socket */
3257 consumer_sockpoll[1].fd = sock;
3258 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3259
cd9adb8b 3260 while (true) {
9ce5646a
MD
3261 health_code_update();
3262
3263 health_poll_entry();
3264 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3265 health_poll_exit();
84382d49
MD
3266 if (ret) {
3267 if (ret > 0) {
3268 /* should exit */
3269 err = 0;
3270 }
3bd1e081
MD
3271 goto end;
3272 }
3273 DBG("Incoming command on sock");
3274 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3275 if (ret <= 0) {
3276 /*
3277 * This could simply be a session daemon quitting. Don't output
3278 * ERR() here.
3279 */
3280 DBG("Communication interrupted on command socket");
41ba6035 3281 err = 0;
3bd1e081
MD
3282 goto end;
3283 }
10211f5c 3284 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3285 DBG("consumer_thread_receive_fds received quit from signal");
28ab034a 3286 err = 0; /* All is OK */
3bd1e081
MD
3287 goto end;
3288 }
a1ed855a 3289 DBG("Received command on sock");
3bd1e081 3290 }
1fc79fb4
MD
3291 /* All is OK */
3292 err = 0;
3293
3bd1e081 3294end:
ffe60014 3295 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3296
d88aee68
DG
3297 /*
3298 * Close metadata streams since the producer is the session daemon which
3299 * just died.
3300 *
3301 * NOTE: for now, this only applies to the UST tracer.
3302 */
6d574024 3303 lttng_consumer_close_all_metadata();
d88aee68 3304
3bd1e081
MD
3305 /*
3306 * when all fds have hung up, the polling thread
3307 * can exit cleanly
3308 */
10211f5c 3309 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3310
04fdd819 3311 /*
c869f647 3312 * Notify the data poll thread to poll back again and test the
8994307f 3313 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3314 */
acdb9057 3315 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3316
cd9adb8b 3317 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3318
5c635c72
MD
3319 notify_health_quit_pipe(health_quit_pipe);
3320
d96f09c6
DG
3321 /* Cleaning up possibly open sockets. */
3322 if (sock >= 0) {
3323 ret = close(sock);
3324 if (ret < 0) {
3325 PERROR("close sock sessiond poll");
3326 }
3327 }
3328 if (client_socket >= 0) {
38476d24 3329 ret = close(client_socket);
d96f09c6
DG
3330 if (ret < 0) {
3331 PERROR("close client_socket sessiond poll");
3332 }
3333 }
3334
2d57de81 3335error_testpoint:
1fc79fb4
MD
3336 if (err) {
3337 health_error();
3338 ERR("Health error occurred in %s", __func__);
3339 }
3340 health_unregister(health_consumerd);
3341
e7b994a3 3342 rcu_unregister_thread();
cd9adb8b 3343 return nullptr;
3bd1e081 3344}
d41f73b7 3345
503fefca 3346static int post_consume(struct lttng_consumer_stream *stream,
28ab034a
JG
3347 const struct stream_subbuffer *subbuffer,
3348 struct lttng_consumer_local_data *ctx)
f96af312 3349{
503fefca 3350 size_t i;
f96af312 3351 int ret = 0;
28ab034a
JG
3352 const size_t count =
3353 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
f96af312 3354
503fefca
JG
3355 for (i = 0; i < count; i++) {
3356 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
28ab034a 3357 &stream->read_subbuffer_ops.post_consume_cbs, i);
503fefca
JG
3358
3359 ret = op(stream, subbuffer, ctx);
3360 if (ret) {
3361 goto end;
f96af312 3362 }
f96af312 3363 }
f96af312
JG
3364end:
3365 return ret;
3366}
3367
4078b776 3368ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
28ab034a
JG
3369 struct lttng_consumer_local_data *ctx,
3370 bool locked_by_caller)
d41f73b7 3371{
12bddd1d 3372 ssize_t ret, written_bytes = 0;
23d56598 3373 int rotation_ret;
6f9449c2 3374 struct stream_subbuffer subbuffer = {};
b6797c8e 3375 enum get_next_subbuffer_status get_next_status;
74251bb8 3376
6f9449c2
JG
3377 if (!locked_by_caller) {
3378 stream->read_subbuffer_ops.lock(stream);
947bd097
JR
3379 } else {
3380 stream->read_subbuffer_ops.assert_locked(stream);
6f9449c2
JG
3381 }
3382
3383 if (stream->read_subbuffer_ops.on_wake_up) {
3384 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3385 if (ret) {
3386 goto end;
3387 }
94d49140 3388 }
74251bb8 3389
23d56598
JG
3390 /*
3391 * If the stream was flagged to be ready for rotation before we extract
3392 * the next packet, rotate it now.
3393 */
3394 if (stream->rotate_ready) {
3395 DBG("Rotate stream before consuming data");
f46376a1 3396 ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3397 if (ret < 0) {
3398 ERR("Stream rotation error before consuming data");
3399 goto end;
3400 }
3401 }
3402
28ab034a 3403 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
b6797c8e
JG
3404 switch (get_next_status) {
3405 case GET_NEXT_SUBBUFFER_STATUS_OK:
3406 break;
3407 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3408 /* Not an error. */
3409 ret = 0;
3410 goto sleep_stream;
3411 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3412 ret = -1;
6f9449c2 3413 goto end;
b6797c8e
JG
3414 default:
3415 abort();
d41f73b7 3416 }
74251bb8 3417
28ab034a 3418 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
6f9449c2
JG
3419 if (ret) {
3420 goto error_put_subbuf;
3421 }
3422
28ab034a 3423 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
514775d9
FD
3424 if (written_bytes <= 0) {
3425 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3426 ret = (int) written_bytes;
3427 goto error_put_subbuf;
6f9449c2
JG
3428 }
3429
3430 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3431 if (ret) {
23d56598
JG
3432 goto end;
3433 }
3434
503fefca
JG
3435 ret = post_consume(stream, &subbuffer, ctx);
3436 if (ret) {
3437 goto end;
6f9449c2
JG
3438 }
3439
23d56598
JG
3440 /*
3441 * After extracting the packet, we check if the stream is now ready to
3442 * be rotated and perform the action immediately.
3443 *
3444 * Don't overwrite `ret` as callers expect the number of bytes
3445 * consumed to be returned on success.
3446 */
3447 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3448 if (rotation_ret == 1) {
f46376a1 3449 rotation_ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3450 if (rotation_ret < 0) {
3451 ret = rotation_ret;
3452 ERR("Stream rotation error after consuming data");
3453 goto end;
3454 }
503fefca 3455
23d56598
JG
3456 } else if (rotation_ret < 0) {
3457 ret = rotation_ret;
3458 ERR("Failed to check if stream was ready to rotate after consuming data");
3459 goto end;
3460 }
3461
82e72193 3462sleep_stream:
6f9449c2
JG
3463 if (stream->read_subbuffer_ops.on_sleep) {
3464 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3465 }
3466
3467 ret = written_bytes;
23d56598 3468end:
6f9449c2
JG
3469 if (!locked_by_caller) {
3470 stream->read_subbuffer_ops.unlock(stream);
94d49140 3471 }
6f9449c2 3472
74251bb8 3473 return ret;
6f9449c2
JG
3474error_put_subbuf:
3475 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3476 goto end;
d41f73b7
MD
3477}
3478
3479int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3480{
fa29bfbf 3481 switch (the_consumer_data.type) {
d41f73b7
MD
3482 case LTTNG_CONSUMER_KERNEL:
3483 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3484 case LTTNG_CONSUMER32_UST:
3485 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3486 return lttng_ustconsumer_on_recv_stream(stream);
3487 default:
3488 ERR("Unknown consumer_data type");
a0377dfe 3489 abort();
d41f73b7
MD
3490 return -ENOSYS;
3491 }
3492}
e4421fec
DG
3493
3494/*
3495 * Allocate and set consumer data hash tables.
3496 */
cd9adb8b 3497int lttng_consumer_init()
e4421fec 3498{
fa29bfbf
SM
3499 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3500 if (!the_consumer_data.channel_ht) {
282dadbc
MD
3501 goto error;
3502 }
3503
28ab034a 3504 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3505 if (!the_consumer_data.channels_by_session_id_ht) {
5c3892a6
JG
3506 goto error;
3507 }
3508
fa29bfbf
SM
3509 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3510 if (!the_consumer_data.relayd_ht) {
282dadbc
MD
3511 goto error;
3512 }
3513
fa29bfbf
SM
3514 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3515 if (!the_consumer_data.stream_list_ht) {
282dadbc
MD
3516 goto error;
3517 }
3518
28ab034a 3519 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3520 if (!the_consumer_data.stream_per_chan_id_ht) {
282dadbc
MD
3521 goto error;
3522 }
3523
3524 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3525 if (!data_ht) {
3526 goto error;
3527 }
3528
3529 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3530 if (!metadata_ht) {
3531 goto error;
3532 }
3533
fa29bfbf
SM
3534 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3535 if (!the_consumer_data.chunk_registry) {
28cc88f3
JG
3536 goto error;
3537 }
3538
282dadbc
MD
3539 return 0;
3540
3541error:
3542 return -1;
e4421fec 3543}
7735ef9e
DG
3544
3545/*
3546 * Process the ADD_RELAYD command receive by a consumer.
3547 *
3548 * This will create a relayd socket pair and add it to the relayd hash table.
3549 * The caller MUST acquire a RCU read side lock before calling it.
3550 */
4222116f 3551void consumer_add_relayd_socket(uint64_t net_seq_idx,
28ab034a
JG
3552 int sock_type,
3553 struct lttng_consumer_local_data *ctx,
3554 int sock,
3555 struct pollfd *consumer_sockpoll,
3556 uint64_t sessiond_id,
3557 uint64_t relayd_session_id,
3558 uint32_t relayd_version_major,
3559 uint32_t relayd_version_minor,
3560 enum lttcomm_sock_proto relayd_socket_protocol)
7735ef9e 3561{
cd2b09ed 3562 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3563 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 3564 struct consumer_relayd_sock_pair *relayd = nullptr;
7735ef9e 3565
a0377dfe 3566 LTTNG_ASSERT(ctx);
4222116f 3567 LTTNG_ASSERT(sock >= 0);
48b7cdc2 3568 ASSERT_RCU_READ_LOCKED();
6151a90f 3569
da009f2c 3570 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3571
3572 /* Get relayd reference if exists. */
3573 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 3574 if (relayd == nullptr) {
a0377dfe 3575 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3576 /* Not found. Allocate one. */
3577 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
cd9adb8b 3578 if (relayd == nullptr) {
618a6a28
MD
3579 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3580 goto error;
0d08d75e 3581 } else {
30319bcb 3582 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3583 relayd_created = 1;
7735ef9e 3584 }
0d08d75e
DG
3585
3586 /*
3587 * This code path MUST continue to the consumer send status message to
3588 * we can notify the session daemon and continue our work without
3589 * killing everything.
3590 */
da009f2c
MD
3591 } else {
3592 /*
3593 * relayd key should never be found for control socket.
3594 */
a0377dfe 3595 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3596 }
3597
3598 /* First send a status message before receiving the fds. */
0c759fc9 3599 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3600 if (ret < 0) {
0d08d75e 3601 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3602 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3603 goto error_nosignal;
7735ef9e
DG
3604 }
3605
3606 /* Poll on consumer socket. */
84382d49
MD
3607 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3608 if (ret) {
3609 /* Needing to exit in the middle of a command: error. */
0d08d75e 3610 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3611 goto error_nosignal;
7735ef9e
DG
3612 }
3613
3614 /* Get relayd socket from session daemon */
3615 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3616 if (ret != sizeof(fd)) {
28ab034a 3617 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3618
3619 /*
3620 * Failing to receive FDs might indicate a major problem such as
3621 * reaching a fd limit during the receive where the kernel returns a
3622 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3623 * don't take any chances and stop everything.
3624 *
3625 * XXX: Feature request #558 will fix that and avoid this possible
3626 * issue when reaching the fd limit.
3627 */
3628 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3629 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3630 goto error;
3631 }
3632
7735ef9e
DG
3633 /* Copy socket information and received FD */
3634 switch (sock_type) {
3635 case LTTNG_STREAM_CONTROL:
3636 /* Copy received lttcomm socket */
4222116f 3637 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3638 &relayd->control_sock.sock, fd, relayd_socket_protocol);
7735ef9e 3639
6151a90f 3640 /* Assign version values. */
4222116f
JR
3641 relayd->control_sock.major = relayd_version_major;
3642 relayd->control_sock.minor = relayd_version_minor;
c5b6f4f0 3643
d3e2ba59 3644 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3645
7735ef9e
DG
3646 break;
3647 case LTTNG_STREAM_DATA:
3648 /* Copy received lttcomm socket */
4222116f 3649 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3650 &relayd->data_sock.sock, fd, relayd_socket_protocol);
6151a90f 3651 /* Assign version values. */
4222116f
JR
3652 relayd->data_sock.major = relayd_version_major;
3653 relayd->data_sock.minor = relayd_version_minor;
7735ef9e
DG
3654 break;
3655 default:
3656 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3657 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3658 goto error;
3659 }
3660
4222116f
JR
3661 if (ret < 0) {
3662 ret_code = LTTCOMM_CONSUMERD_FATAL;
3663 goto error;
3664 }
3665
d88aee68 3666 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
28ab034a
JG
3667 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3668 relayd->net_seq_idx,
3669 fd);
39d9954c
FD
3670 /*
3671 * We gave the ownership of the fd to the relayd structure. Set the
3672 * fd to -1 so we don't call close() on it in the error path below.
3673 */
3674 fd = -1;
7735ef9e 3675
618a6a28
MD
3676 /* We successfully added the socket. Send status back. */
3677 ret = consumer_send_status_msg(sock, ret_code);
3678 if (ret < 0) {
3679 /* Somehow, the session daemon is not responding anymore. */
3680 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3681 goto error_nosignal;
3682 }
3683
7735ef9e
DG
3684 /*
3685 * Add relayd socket pair to consumer data hashtable. If object already
3686 * exists or on error, the function gracefully returns.
3687 */
9276e5c8 3688 relayd->ctx = ctx;
d09e1200 3689 add_relayd(relayd);
7735ef9e
DG
3690
3691 /* All good! */
2527bf85 3692 return;
7735ef9e
DG
3693
3694error:
618a6a28
MD
3695 if (consumer_send_status_msg(sock, ret_code) < 0) {
3696 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3697 }
3698
3699error_nosignal:
4028eeb9
DG
3700 /* Close received socket if valid. */
3701 if (fd >= 0) {
3702 if (close(fd)) {
3703 PERROR("close received socket");
3704 }
3705 }
cd2b09ed
DG
3706
3707 if (relayd_created) {
cd2b09ed
DG
3708 free(relayd);
3709 }
7735ef9e 3710}
ca22feea 3711
f7079f67
DG
3712/*
3713 * Search for a relayd associated to the session id and return the reference.
3714 *
3715 * A rcu read side lock MUST be acquire before calling this function and locked
3716 * until the relayd object is no longer necessary.
3717 */
3718static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3719{
3720 struct lttng_ht_iter iter;
cd9adb8b 3721 struct consumer_relayd_sock_pair *relayd = nullptr;
f7079f67 3722
48b7cdc2
FD
3723 ASSERT_RCU_READ_LOCKED();
3724
f7079f67 3725 /* Iterate over all relayd since they are indexed by net_seq_idx. */
28ab034a 3726 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
18261bd1
DG
3727 /*
3728 * Check by sessiond id which is unique here where the relayd session
3729 * id might not be when having multiple relayd.
3730 */
3731 if (relayd->sessiond_session_id == id) {
f7079f67 3732 /* Found the relayd. There can be only one per id. */
18261bd1 3733 goto found;
f7079f67
DG
3734 }
3735 }
3736
cd9adb8b 3737 return nullptr;
18261bd1
DG
3738
3739found:
f7079f67
DG
3740 return relayd;
3741}
3742
ca22feea
DG
3743/*
3744 * Check if for a given session id there is still data needed to be extract
3745 * from the buffers.
3746 *
6d805429 3747 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3748 */
6d805429 3749int consumer_data_pending(uint64_t id)
ca22feea
DG
3750{
3751 int ret;
3752 struct lttng_ht_iter iter;
3753 struct lttng_ht *ht;
3754 struct lttng_consumer_stream *stream;
cd9adb8b 3755 struct consumer_relayd_sock_pair *relayd = nullptr;
6d805429 3756 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3757
6d805429 3758 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3759
56047f5a 3760 lttng::urcu::read_lock_guard read_lock;
fa29bfbf 3761 pthread_mutex_lock(&the_consumer_data.lock);
ca22feea 3762
fa29bfbf 3763 switch (the_consumer_data.type) {
ca22feea 3764 case LTTNG_CONSUMER_KERNEL:
6d805429 3765 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3766 break;
3767 case LTTNG_CONSUMER32_UST:
3768 case LTTNG_CONSUMER64_UST:
6d805429 3769 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3770 break;
3771 default:
3772 ERR("Unknown consumer data type");
a0377dfe 3773 abort();
ca22feea
DG
3774 }
3775
3776 /* Ease our life a bit */
fa29bfbf 3777 ht = the_consumer_data.stream_list_ht;
ca22feea 3778
c8f59ee5 3779 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3780 ht->hash_fct(&id, lttng_ht_seed),
3781 ht->match_fct,
3782 &id,
3783 &iter.iter,
3784 stream,
3785 node_session_id.node)
3786 {
bb586a6e 3787 pthread_mutex_lock(&stream->lock);
ca22feea 3788
4e9a4686
DG
3789 /*
3790 * A removed node from the hash table indicates that the stream has
3791 * been deleted thus having a guarantee that the buffers are closed
3792 * on the consumer side. However, data can still be transmitted
3793 * over the network so don't skip the relayd check.
3794 */
3795 ret = cds_lfht_is_node_deleted(&stream->node.node);
3796 if (!ret) {
3797 /* Check the stream if there is data in the buffers. */
6d805429
DG
3798 ret = data_pending(stream);
3799 if (ret == 1) {
4e9a4686 3800 pthread_mutex_unlock(&stream->lock);
f7079f67 3801 goto data_pending;
4e9a4686
DG
3802 }
3803 }
3804
d9f0c7c7
JR
3805 pthread_mutex_unlock(&stream->lock);
3806 }
3807
3808 relayd = find_relayd_by_session_id(id);
3809 if (relayd) {
3810 unsigned int is_data_inflight = 0;
3811
3812 /* Send init command for data pending. */
3813 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 3814 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
d9f0c7c7
JR
3815 if (ret < 0) {
3816 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3817 /* Communication error thus the relayd so no data pending. */
3818 goto data_not_pending;
3819 }
3820
3821 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3822 ht->hash_fct(&id, lttng_ht_seed),
3823 ht->match_fct,
3824 &id,
3825 &iter.iter,
3826 stream,
3827 node_session_id.node)
3828 {
c8f59ee5 3829 if (stream->metadata_flag) {
ad7051c0 3830 ret = relayd_quiescent_control(&relayd->control_sock,
28ab034a 3831 stream->relayd_stream_id);
c8f59ee5 3832 } else {
6d805429 3833 ret = relayd_data_pending(&relayd->control_sock,
28ab034a
JG
3834 stream->relayd_stream_id,
3835 stream->next_net_seq_num - 1);
c8f59ee5 3836 }
d9f0c7c7
JR
3837
3838 if (ret == 1) {
3839 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3840 goto data_pending;
3841 } else if (ret < 0) {
28ab034a
JG
3842 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3843 relayd->net_seq_idx);
9276e5c8
JR
3844 lttng_consumer_cleanup_relayd(relayd);
3845 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
9276e5c8
JR
3846 goto data_not_pending;
3847 }
c8f59ee5 3848 }
f7079f67 3849
d9f0c7c7 3850 /* Send end command for data pending. */
28ab034a
JG
3851 ret = relayd_end_data_pending(
3852 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
f7079f67 3853 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3854 if (ret < 0) {
28ab034a
JG
3855 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3856 relayd->net_seq_idx);
9276e5c8 3857 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3858 goto data_not_pending;
3859 }
bdd88757
DG
3860 if (is_data_inflight) {
3861 goto data_pending;
3862 }
f7079f67
DG
3863 }
3864
ca22feea 3865 /*
f7079f67
DG
3866 * Finding _no_ node in the hash table and no inflight data means that the
3867 * stream(s) have been removed thus data is guaranteed to be available for
3868 * analysis from the trace files.
ca22feea
DG
3869 */
3870
f7079f67 3871data_not_pending:
ca22feea 3872 /* Data is available to be read by a viewer. */
fa29bfbf 3873 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3874 return 0;
ca22feea 3875
f7079f67 3876data_pending:
ca22feea 3877 /* Data is still being extracted from buffers. */
fa29bfbf 3878 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3879 return 1;
ca22feea 3880}
f50f23d9
DG
3881
3882/*
3883 * Send a ret code status message to the sessiond daemon.
3884 *
3885 * Return the sendmsg() return value.
3886 */
3887int consumer_send_status_msg(int sock, int ret_code)
3888{
3889 struct lttcomm_consumer_status_msg msg;
3890
53efb85a 3891 memset(&msg, 0, sizeof(msg));
97535efa 3892 msg.ret_code = (lttcomm_return_code) ret_code;
f50f23d9
DG
3893
3894 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3895}
ffe60014
DG
3896
3897/*
3898 * Send a channel status message to the sessiond daemon.
3899 *
3900 * Return the sendmsg() return value.
3901 */
28ab034a 3902int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
ffe60014
DG
3903{
3904 struct lttcomm_consumer_status_channel msg;
3905
a0377dfe 3906 LTTNG_ASSERT(sock >= 0);
ffe60014 3907
53efb85a 3908 memset(&msg, 0, sizeof(msg));
ffe60014 3909 if (!channel) {
0c759fc9 3910 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3911 } else {
0c759fc9 3912 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3913 msg.key = channel->key;
3914 msg.stream_count = channel->streams.count;
3915 }
3916
3917 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3918}
5c786ded 3919
d07ceecd 3920unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
28ab034a
JG
3921 unsigned long produced_pos,
3922 uint64_t nb_packets_per_stream,
3923 uint64_t max_sb_size)
5c786ded 3924{
d07ceecd 3925 unsigned long start_pos;
5c786ded 3926
d07ceecd 3927 if (!nb_packets_per_stream) {
28ab034a 3928 return consumed_pos; /* Grab everything */
d07ceecd 3929 }
1cbd136b 3930 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
d07ceecd
MD
3931 start_pos -= max_sb_size * nb_packets_per_stream;
3932 if ((long) (start_pos - consumed_pos) < 0) {
28ab034a 3933 return consumed_pos; /* Grab everything */
d07ceecd
MD
3934 }
3935 return start_pos;
5c786ded 3936}
a1ae2ea5 3937
c1dcb8bb
JG
3938/* Stream lock must be held by the caller. */
3939static int sample_stream_positions(struct lttng_consumer_stream *stream,
28ab034a
JG
3940 unsigned long *produced,
3941 unsigned long *consumed)
c1dcb8bb
JG
3942{
3943 int ret;
3944
3945 ASSERT_LOCKED(stream->lock);
3946
3947 ret = lttng_consumer_sample_snapshot_positions(stream);
3948 if (ret < 0) {
3949 ERR("Failed to sample snapshot positions");
3950 goto end;
3951 }
3952
3953 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3954 if (ret < 0) {
3955 ERR("Failed to sample produced position");
3956 goto end;
3957 }
3958
3959 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3960 if (ret < 0) {
3961 ERR("Failed to sample consumed position");
3962 goto end;
3963 }
3964
3965end:
3966 return ret;
3967}
3968
b99a8d42
JD
3969/*
3970 * Sample the rotate position for all the streams of a channel. If a stream
3971 * is already at the rotate position (produced == consumed), we flag it as
3972 * ready for rotation. The rotation of ready streams occurs after we have
3973 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3974 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3975 *
3976 * Returns 0 on success, < 0 on error
3977 */
92b7a7f8 3978int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
3979 uint64_t key,
3980 uint64_t relayd_id)
b99a8d42
JD
3981{
3982 int ret;
b99a8d42
JD
3983 struct lttng_consumer_stream *stream;
3984 struct lttng_ht_iter iter;
fa29bfbf 3985 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
c35f9726
JG
3986 struct lttng_dynamic_array stream_rotation_positions;
3987 uint64_t next_chunk_id, stream_count = 0;
3988 enum lttng_trace_chunk_status chunk_status;
3989 const bool is_local_trace = relayd_id == -1ULL;
cd9adb8b 3990 struct consumer_relayd_sock_pair *relayd = nullptr;
c35f9726 3991 bool rotating_to_new_chunk = true;
b32703d6
JG
3992 /* Array of `struct lttng_consumer_stream *` */
3993 struct lttng_dynamic_pointer_array streams_packet_to_open;
3994 size_t stream_idx;
b99a8d42 3995
48b7cdc2
FD
3996 ASSERT_RCU_READ_LOCKED();
3997
b99a8d42
JD
3998 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3999
cd9adb8b
JG
4000 lttng_dynamic_array_init(&stream_rotation_positions,
4001 sizeof(struct relayd_stream_rotation_position),
4002 nullptr);
4003 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
c35f9726 4004
56047f5a 4005 lttng::urcu::read_lock_guard read_lock;
b99a8d42 4006
b99a8d42 4007 pthread_mutex_lock(&channel->lock);
a0377dfe 4008 LTTNG_ASSERT(channel->trace_chunk);
28ab034a 4009 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
c35f9726
JG
4010 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4011 ret = -1;
4012 goto end_unlock_channel;
4013 }
b99a8d42
JD
4014
4015 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4016 ht->hash_fct(&channel->key, lttng_ht_seed),
4017 ht->match_fct,
4018 &channel->key,
4019 &iter.iter,
4020 stream,
4021 node_channel_id.node)
4022 {
a40a503f 4023 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
4024
4025 health_code_update();
4026
4027 /*
4028 * Lock stream because we are about to change its state.
4029 */
4030 pthread_mutex_lock(&stream->lock);
4031
c35f9726
JG
4032 if (stream->trace_chunk == stream->chan->trace_chunk) {
4033 rotating_to_new_chunk = false;
4034 }
4035
a40a503f 4036 /*
c1dcb8bb 4037 * Do not flush a packet when rotating from a NULL trace
a9dde553 4038 * chunk. The stream has no means to output data, and the prior
c1dcb8bb
JG
4039 * rotation which rotated to NULL performed that side-effect
4040 * already. No new data can be produced when a stream has no
4041 * associated trace chunk (e.g. a stop followed by a rotate).
a40a503f 4042 */
a9dde553 4043 if (stream->trace_chunk) {
c1dcb8bb
JG
4044 bool flush_active;
4045
4046 if (stream->metadata_flag) {
4047 /*
4048 * Don't produce an empty metadata packet,
4049 * simply close the current one.
4050 *
4051 * Metadata is regenerated on every trace chunk
4052 * switch; there is no concern that no data was
4053 * produced.
4054 */
4055 flush_active = true;
4056 } else {
4057 /*
4058 * Only flush an empty packet if the "packet
4059 * open" could not be performed on transition
4060 * to a new trace chunk and no packets were
4061 * consumed within the chunk's lifetime.
4062 */
4063 if (stream->opened_packet_in_current_trace_chunk) {
4064 flush_active = true;
4065 } else {
4066 /*
4067 * Stream could have been full at the
4068 * time of rotation, but then have had
4069 * no activity at all.
4070 *
4071 * It is important to flush a packet
4072 * to prevent 0-length files from being
4073 * produced as most viewers choke on
4074 * them.
4075 *
4076 * Unfortunately viewers will not be
4077 * able to know that tracing was active
4078 * for this stream during this trace
4079 * chunk's lifetime.
4080 */
28ab034a
JG
4081 ret = sample_stream_positions(
4082 stream, &produced_pos, &consumed_pos);
c1dcb8bb
JG
4083 if (ret) {
4084 goto end_unlock_stream;
4085 }
4086
4087 /*
4088 * Don't flush an empty packet if data
4089 * was produced; it will be consumed
4090 * before the rotation completes.
4091 */
4092 flush_active = produced_pos != consumed_pos;
4093 if (!flush_active) {
c1dcb8bb
JG
4094 const char *trace_chunk_name;
4095 uint64_t trace_chunk_id;
4096
4097 chunk_status = lttng_trace_chunk_get_name(
28ab034a
JG
4098 stream->trace_chunk,
4099 &trace_chunk_name,
cd9adb8b 4100 nullptr);
c1dcb8bb
JG
4101 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4102 trace_chunk_name = "none";
4103 }
4104
4105 /*
4106 * Consumer trace chunks are
4107 * never anonymous.
4108 */
4109 chunk_status = lttng_trace_chunk_get_id(
28ab034a 4110 stream->trace_chunk, &trace_chunk_id);
a0377dfe 4111 LTTNG_ASSERT(chunk_status ==
28ab034a 4112 LTTNG_TRACE_CHUNK_STATUS_OK);
c1dcb8bb
JG
4113
4114 DBG("Unable to open packet for stream during trace chunk's lifetime. "
28ab034a
JG
4115 "Flushing an empty packet to prevent an empty file from being created: "
4116 "stream id = %" PRIu64
4117 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4118 stream->key,
4119 trace_chunk_name,
4120 trace_chunk_id);
c1dcb8bb
JG
4121 }
4122 }
4123 }
4124
a9dde553 4125 /*
c1dcb8bb
JG
4126 * Close the current packet before sampling the
4127 * ring buffer positions.
a9dde553 4128 */
c1dcb8bb 4129 ret = consumer_stream_flush_buffer(stream, flush_active);
a9dde553
MD
4130 if (ret < 0) {
4131 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
28ab034a 4132 stream->key);
a9dde553
MD
4133 goto end_unlock_stream;
4134 }
b99a8d42
JD
4135 }
4136
a40a503f
MD
4137 ret = lttng_consumer_take_snapshot(stream);
4138 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4139 ERR("Failed to sample snapshot position during channel rotation");
b99a8d42
JD
4140 goto end_unlock_stream;
4141 }
a40a503f 4142 if (!ret) {
28ab034a 4143 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
a40a503f
MD
4144 if (ret < 0) {
4145 ERR("Failed to sample produced position during channel rotation");
4146 goto end_unlock_stream;
4147 }
b99a8d42 4148
28ab034a 4149 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
a40a503f
MD
4150 if (ret < 0) {
4151 ERR("Failed to sample consumed position during channel rotation");
4152 goto end_unlock_stream;
4153 }
4154 }
4155 /*
4156 * Align produced position on the start-of-packet boundary of the first
4157 * packet going into the next trace chunk.
4158 */
1cbd136b 4159 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
a40a503f 4160 if (consumed_pos == produced_pos) {
f8528c7a 4161 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
28ab034a
JG
4162 stream->key,
4163 produced_pos,
4164 consumed_pos);
b99a8d42 4165 stream->rotate_ready = true;
f8528c7a
MD
4166 } else {
4167 DBG("Different consumed and produced positions "
28ab034a
JG
4168 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4169 stream->key,
4170 produced_pos,
4171 consumed_pos);
b99a8d42 4172 }
633d0182 4173 /*
a40a503f
MD
4174 * The rotation position is based on the packet_seq_num of the
4175 * packet following the last packet that was consumed for this
4176 * stream, incremented by the offset between produced and
4177 * consumed positions. This rotation position is a lower bound
4178 * (inclusive) at which the next trace chunk starts. Since it
4179 * is a lower bound, it is OK if the packet_seq_num does not
4180 * correspond exactly to the same packet identified by the
4181 * consumed_pos, which can happen in overwrite mode.
633d0182 4182 */
a40a503f
MD
4183 if (stream->sequence_number_unavailable) {
4184 /*
4185 * Rotation should never be performed on a session which
4186 * interacts with a pre-2.8 lttng-modules, which does
4187 * not implement packet sequence number.
4188 */
4189 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
28ab034a 4190 stream->key);
a40a503f 4191 ret = -1;
b99a8d42
JD
4192 goto end_unlock_stream;
4193 }
a40a503f 4194 stream->rotate_position = stream->last_sequence_number + 1 +
28ab034a 4195 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a 4196 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
28ab034a
JG
4197 stream->key,
4198 stream->rotate_position);
b99a8d42 4199
c35f9726 4200 if (!is_local_trace) {
633d0182
JG
4201 /*
4202 * The relay daemon control protocol expects a rotation
4203 * position as "the sequence number of the first packet
a40a503f 4204 * _after_ the current trace chunk".
633d0182 4205 */
c35f9726
JG
4206 const struct relayd_stream_rotation_position position = {
4207 .stream_id = stream->relayd_stream_id,
a40a503f 4208 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4209 };
4210
28ab034a
JG
4211 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4212 &position);
c35f9726
JG
4213 if (ret) {
4214 ERR("Failed to allocate stream rotation position");
4215 goto end_unlock_stream;
4216 }
4217 stream_count++;
4218 }
f96af312
JG
4219
4220 stream->opened_packet_in_current_trace_chunk = false;
4221
4222 if (rotating_to_new_chunk && !stream->metadata_flag) {
4223 /*
4224 * Attempt to flush an empty packet as close to the
4225 * rotation point as possible. In the event where a
4226 * stream remains inactive after the rotation point,
4227 * this ensures that the new trace chunk has a
4228 * beginning timestamp set at the begining of the
4229 * trace chunk instead of only creating an empty
4230 * packet when the trace chunk is stopped.
4231 *
4232 * This indicates to the viewers that the stream
4233 * was being recorded, but more importantly it
4234 * allows viewers to determine a useable trace
4235 * intersection.
4236 *
4237 * This presents a problem in the case where the
4238 * ring-buffer is completely full.
4239 *
4240 * Consider the following scenario:
4241 * - The consumption of data is slow (slow network,
4242 * for instance),
4243 * - The ring buffer is full,
4244 * - A rotation is initiated,
4245 * - The flush below does nothing (no space left to
4246 * open a new packet),
4247 * - The other streams rotate very soon, and new
4248 * data is produced in the new chunk,
4249 * - This stream completes its rotation long after the
4250 * rotation was initiated
4251 * - The session is stopped before any event can be
4252 * produced in this stream's buffers.
4253 *
4254 * The resulting trace chunk will have a single packet
4255 * temporaly at the end of the trace chunk for this
4256 * stream making the stream intersection more narrow
4257 * than it should be.
4258 *
4259 * To work-around this, an empty flush is performed
4260 * after the first consumption of a packet during a
4261 * rotation if open_packet fails. The idea is that
4262 * consuming a packet frees enough space to switch
4263 * packets in this scenario and allows the tracer to
4264 * "stamp" the beginning of the new trace chunk at the
4265 * earliest possible point.
b32703d6
JG
4266 *
4267 * The packet open is performed after the channel
4268 * rotation to ensure that no attempt to open a packet
4269 * is performed in a stream that has no active trace
4270 * chunk.
f96af312 4271 */
28ab034a
JG
4272 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4273 stream);
b32703d6
JG
4274 if (ret) {
4275 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
f96af312
JG
4276 ret = -1;
4277 goto end_unlock_stream;
f96af312
JG
4278 }
4279 }
4280
b99a8d42
JD
4281 pthread_mutex_unlock(&stream->lock);
4282 }
cd9adb8b 4283 stream = nullptr;
b99a8d42 4284
b32703d6
JG
4285 if (!is_local_trace) {
4286 relayd = consumer_find_relayd(relayd_id);
4287 if (!relayd) {
4288 ERR("Failed to find relayd %" PRIu64, relayd_id);
4289 ret = -1;
4290 goto end_unlock_channel;
4291 }
c35f9726 4292
b32703d6 4293 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
4294 ret = relayd_rotate_streams(&relayd->control_sock,
4295 stream_count,
cd9adb8b 4296 rotating_to_new_chunk ? &next_chunk_id : nullptr,
28ab034a
JG
4297 (const struct relayd_stream_rotation_position *)
4298 stream_rotation_positions.buffer.data);
b32703d6
JG
4299 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4300 if (ret < 0) {
4301 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
28ab034a 4302 relayd->net_seq_idx);
b32703d6
JG
4303 lttng_consumer_cleanup_relayd(relayd);
4304 goto end_unlock_channel;
4305 }
c35f9726
JG
4306 }
4307
b32703d6 4308 for (stream_idx = 0;
28ab034a
JG
4309 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4310 stream_idx++) {
b32703d6
JG
4311 enum consumer_stream_open_packet_status status;
4312
97535efa 4313 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
28ab034a 4314 &streams_packet_to_open, stream_idx);
b32703d6
JG
4315
4316 pthread_mutex_lock(&stream->lock);
4317 status = consumer_stream_open_packet(stream);
4318 pthread_mutex_unlock(&stream->lock);
4319 switch (status) {
4320 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4321 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4322 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4323 stream->key,
4324 stream->chan->name,
4325 stream->chan->session_id);
b32703d6
JG
4326 break;
4327 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4328 /*
4329 * Can't open a packet as there is no space left
4330 * in the buffer. A new packet will be opened
4331 * once one has been consumed.
4332 */
4333 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4334 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4335 stream->key,
4336 stream->chan->name,
4337 stream->chan->session_id);
b32703d6
JG
4338 break;
4339 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4340 /* Logged by callee. */
4341 ret = -1;
7a86c13d 4342 goto end_unlock_channel;
b32703d6
JG
4343 default:
4344 abort();
4345 }
c35f9726
JG
4346 }
4347
b32703d6 4348 pthread_mutex_unlock(&channel->lock);
b99a8d42
JD
4349 ret = 0;
4350 goto end;
4351
4352end_unlock_stream:
4353 pthread_mutex_unlock(&stream->lock);
c35f9726 4354end_unlock_channel:
b99a8d42
JD
4355 pthread_mutex_unlock(&channel->lock);
4356end:
c35f9726 4357 lttng_dynamic_array_reset(&stream_rotation_positions);
b32703d6 4358 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
b99a8d42
JD
4359 return ret;
4360}
4361
28ab034a 4362static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4363{
4364 int ret = 0;
4365 unsigned long consumed_pos_before, consumed_pos_after;
4366
4367 ret = lttng_consumer_sample_snapshot_positions(stream);
4368 if (ret < 0) {
4369 ERR("Taking snapshot positions");
4370 goto end;
4371 }
4372
4373 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4374 if (ret < 0) {
4375 ERR("Consumed snapshot position");
4376 goto end;
4377 }
4378
fa29bfbf 4379 switch (the_consumer_data.type) {
5f3aff8b
MD
4380 case LTTNG_CONSUMER_KERNEL:
4381 ret = kernctl_buffer_clear(stream->wait_fd);
4382 if (ret < 0) {
96393977 4383 ERR("Failed to clear kernel stream (ret = %d)", ret);
5f3aff8b
MD
4384 goto end;
4385 }
4386 break;
4387 case LTTNG_CONSUMER32_UST:
4388 case LTTNG_CONSUMER64_UST:
881fc67f
MD
4389 ret = lttng_ustconsumer_clear_buffer(stream);
4390 if (ret < 0) {
4391 ERR("Failed to clear ust stream (ret = %d)", ret);
4392 goto end;
4393 }
5f3aff8b
MD
4394 break;
4395 default:
4396 ERR("Unknown consumer_data type");
4397 abort();
4398 }
4399
4400 ret = lttng_consumer_sample_snapshot_positions(stream);
4401 if (ret < 0) {
4402 ERR("Taking snapshot positions");
4403 goto end;
4404 }
4405 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4406 if (ret < 0) {
4407 ERR("Consumed snapshot position");
4408 goto end;
4409 }
4410 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4411end:
4412 return ret;
4413}
4414
28ab034a 4415static int consumer_clear_stream(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4416{
4417 int ret;
4418
cd9adb8b 4419 ret = consumer_stream_flush_buffer(stream, true);
5f3aff8b 4420 if (ret < 0) {
28ab034a 4421 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4422 ret = LTTCOMM_CONSUMERD_FATAL;
4423 goto error;
4424 }
4425
4426 ret = consumer_clear_buffer(stream);
4427 if (ret < 0) {
28ab034a 4428 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4429 ret = LTTCOMM_CONSUMERD_FATAL;
4430 goto error;
4431 }
4432
4433 ret = LTTCOMM_CONSUMERD_SUCCESS;
4434error:
4435 return ret;
4436}
4437
28ab034a 4438static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
4439{
4440 int ret;
4441 struct lttng_consumer_stream *stream;
4442
56047f5a 4443 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 4444 pthread_mutex_lock(&channel->lock);
28ab034a 4445 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5f3aff8b
MD
4446 health_code_update();
4447 pthread_mutex_lock(&stream->lock);
4448 ret = consumer_clear_stream(stream);
4449 if (ret) {
4450 goto error_unlock;
4451 }
4452 pthread_mutex_unlock(&stream->lock);
4453 }
4454 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4455 return 0;
4456
4457error_unlock:
4458 pthread_mutex_unlock(&stream->lock);
4459 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4460 return ret;
4461}
4462
02d02e31
JD
4463/*
4464 * Check if a stream is ready to be rotated after extracting it.
4465 *
4466 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4467 * error. Stream lock must be held.
4468 */
4469int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4470{
28ab034a
JG
4471 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4472 " last_sequence_number %" PRIu64,
4473 stream->key,
4474 stream->rotate_ready,
4475 stream->rotate_position,
4476 stream->last_sequence_number);
02d02e31 4477 if (stream->rotate_ready) {
a40a503f 4478 return 1;
02d02e31
JD
4479 }
4480
4481 /*
a40a503f
MD
4482 * If packet seq num is unavailable, it means we are interacting
4483 * with a pre-2.8 lttng-modules which does not implement the
4484 * sequence number. Rotation should never be used by sessiond in this
4485 * scenario.
02d02e31 4486 */
a40a503f
MD
4487 if (stream->sequence_number_unavailable) {
4488 ERR("Internal error: rotation used on stream %" PRIu64
28ab034a
JG
4489 " with unavailable sequence number",
4490 stream->key);
a40a503f 4491 return -1;
02d02e31
JD
4492 }
4493
28ab034a 4494 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
a40a503f 4495 return 0;
02d02e31
JD
4496 }
4497
a40a503f
MD
4498 /*
4499 * Rotate position not reached yet. The stream rotate position is
4500 * the position of the next packet belonging to the next trace chunk,
4501 * but consumerd considers rotation ready when reaching the last
4502 * packet of the current chunk, hence the "rotate_position - 1".
4503 */
f8528c7a 4504
28ab034a
JG
4505 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4506 " rotate_position %" PRIu64,
4507 stream->key,
4508 stream->last_sequence_number,
4509 stream->rotate_position);
a40a503f
MD
4510 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4511 return 1;
02d02e31 4512 }
02d02e31 4513
a40a503f 4514 return 0;
02d02e31
JD
4515}
4516
d73bf3d7
JD
4517/*
4518 * Reset the state for a stream after a rotation occurred.
4519 */
4520void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4521{
28ab034a 4522 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
a40a503f 4523 stream->rotate_position = -1ULL;
d73bf3d7
JD
4524 stream->rotate_ready = false;
4525}
4526
4527/*
4528 * Perform the rotation a local stream file.
4529 */
28ab034a 4530static int rotate_local_stream(struct lttng_consumer_stream *stream)
d73bf3d7 4531{
d2956687 4532 int ret = 0;
d73bf3d7 4533
d2956687 4534 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
28ab034a
JG
4535 stream->key,
4536 stream->chan->key);
d73bf3d7 4537 stream->tracefile_size_current = 0;
d2956687 4538 stream->tracefile_count_current = 0;
d73bf3d7 4539
d2956687
JG
4540 if (stream->out_fd >= 0) {
4541 ret = close(stream->out_fd);
4542 if (ret) {
4543 PERROR("Failed to close stream out_fd of channel \"%s\"",
28ab034a 4544 stream->chan->name);
d2956687
JG
4545 }
4546 stream->out_fd = -1;
4547 }
d73bf3d7 4548
d2956687 4549 if (stream->index_file) {
d73bf3d7 4550 lttng_index_file_put(stream->index_file);
cd9adb8b 4551 stream->index_file = nullptr;
d73bf3d7
JD
4552 }
4553
d2956687
JG
4554 if (!stream->trace_chunk) {
4555 goto end;
4556 }
d73bf3d7 4557
d2956687 4558 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4559end:
4560 return ret;
d73bf3d7
JD
4561}
4562
d73bf3d7
JD
4563/*
4564 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4565 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4566 *
4567 * Return 0 on success, a negative number of error.
4568 */
f46376a1 4569int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
d73bf3d7
JD
4570{
4571 int ret;
4572
4573 DBG("Consumer rotate stream %" PRIu64, stream->key);
4574
d2956687
JG
4575 /*
4576 * Update the stream's 'current' chunk to the session's (channel)
4577 * now-current chunk.
4578 */
4579 lttng_trace_chunk_put(stream->trace_chunk);
4580 if (stream->chan->trace_chunk == stream->trace_chunk) {
4581 /*
4582 * A channel can be rotated and not have a "next" chunk
4583 * to transition to. In that case, the channel's "current chunk"
4584 * has not been closed yet, but it has not been updated to
4585 * a "next" trace chunk either. Hence, the stream, like its
4586 * parent channel, becomes part of no chunk and can't output
4587 * anything until a new trace chunk is created.
4588 */
cd9adb8b 4589 stream->trace_chunk = nullptr;
28ab034a 4590 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
d2956687
JG
4591 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4592 ret = -1;
4593 goto error;
4594 } else {
4595 /*
4596 * Update the stream's trace chunk to its parent channel's
4597 * current trace chunk.
4598 */
4599 stream->trace_chunk = stream->chan->trace_chunk;
4600 }
4601
c35f9726 4602 if (stream->net_seq_idx == (uint64_t) -1ULL) {
f46376a1 4603 ret = rotate_local_stream(stream);
c35f9726
JG
4604 if (ret < 0) {
4605 ERR("Failed to rotate stream, ret = %i", ret);
4606 goto error;
4607 }
d73bf3d7
JD
4608 }
4609
d2956687
JG
4610 if (stream->metadata_flag && stream->trace_chunk) {
4611 /*
4612 * If the stream has transitioned to a new trace
4613 * chunk, the metadata should be re-dumped to the
4614 * newest chunk.
4615 *
4616 * However, it is possible for a stream to transition to
4617 * a "no-chunk" state. This can happen if a rotation
4618 * occurs on an inactive session. In such cases, the metadata
4619 * regeneration will happen when the next trace chunk is
4620 * created.
4621 */
4622 ret = consumer_metadata_stream_dump(stream);
4623 if (ret) {
4624 goto error;
d73bf3d7
JD
4625 }
4626 }
4627 lttng_consumer_reset_stream_rotate_state(stream);
4628
4629 ret = 0;
4630
4631error:
4632 return ret;
4633}
4634
b99a8d42
JD
4635/*
4636 * Rotate all the ready streams now.
4637 *
4638 * This is especially important for low throughput streams that have already
4639 * been consumed, we cannot wait for their next packet to perform the
4640 * rotation.
92b7a7f8
MD
4641 * Need to be called with RCU read-side lock held to ensure existence of
4642 * channel.
b99a8d42
JD
4643 *
4644 * Returns 0 on success, < 0 on error
4645 */
28ab034a 4646int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
b99a8d42
JD
4647{
4648 int ret;
b99a8d42
JD
4649 struct lttng_consumer_stream *stream;
4650 struct lttng_ht_iter iter;
fa29bfbf 4651 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
b99a8d42 4652
48b7cdc2
FD
4653 ASSERT_RCU_READ_LOCKED();
4654
56047f5a 4655 lttng::urcu::read_lock_guard read_lock;
b99a8d42
JD
4656
4657 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4658
b99a8d42 4659 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4660 ht->hash_fct(&channel->key, lttng_ht_seed),
4661 ht->match_fct,
4662 &channel->key,
4663 &iter.iter,
4664 stream,
4665 node_channel_id.node)
4666 {
b99a8d42
JD
4667 health_code_update();
4668
d2956687 4669 pthread_mutex_lock(&stream->chan->lock);
b99a8d42
JD
4670 pthread_mutex_lock(&stream->lock);
4671
4672 if (!stream->rotate_ready) {
4673 pthread_mutex_unlock(&stream->lock);
d2956687 4674 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4675 continue;
4676 }
4677 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4678
f46376a1 4679 ret = lttng_consumer_rotate_stream(stream);
b99a8d42 4680 pthread_mutex_unlock(&stream->lock);
d2956687 4681 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4682 if (ret) {
4683 goto end;
4684 }
4685 }
4686
4687 ret = 0;
4688
4689end:
b99a8d42
JD
4690 return ret;
4691}
4692
28ab034a
JG
4693enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4694 const lttng_uuid& sessiond_uuid)
00fb02ac 4695{
d2956687 4696 enum lttcomm_return_code ret;
c70636a7 4697 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4698
d2956687
JG
4699 if (ctx->sessiond_uuid.is_set) {
4700 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4701 goto end;
4702 }
4703
d2956687 4704 ctx->sessiond_uuid.is_set = true;
328c2fe7 4705 ctx->sessiond_uuid.value = sessiond_uuid;
d2956687
JG
4706 ret = LTTCOMM_CONSUMERD_SUCCESS;
4707 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4708 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4709end:
4710 return ret;
4711}
4712
28ab034a
JG
4713enum lttcomm_return_code
4714lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4715 uint64_t session_id,
4716 uint64_t chunk_id,
4717 time_t chunk_creation_timestamp,
4718 const char *chunk_override_name,
4719 const struct lttng_credentials *credentials,
4720 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4721{
4722 int ret;
d2956687 4723 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 4724 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
d2956687
JG
4725 enum lttng_trace_chunk_status chunk_status;
4726 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4727 char creation_timestamp_buffer[ISO8601_STR_LEN];
4728 const char *relayd_id_str = "(none)";
4729 const char *creation_timestamp_str;
4730 struct lttng_ht_iter iter;
4731 struct lttng_consumer_channel *channel;
92816cc3 4732
d2956687
JG
4733 if (relayd_id) {
4734 /* Only used for logging purposes. */
28ab034a 4735 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4736 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4737 relayd_id_str = relayd_id_buffer;
4738 } else {
4739 relayd_id_str = "(formatting error)";
4740 }
d01ef216 4741 }
d2956687 4742
d01ef216 4743 /* Local protocol error. */
a0377dfe 4744 LTTNG_ASSERT(chunk_creation_timestamp);
d2956687 4745 ret = time_to_iso8601_str(chunk_creation_timestamp,
28ab034a
JG
4746 creation_timestamp_buffer,
4747 sizeof(creation_timestamp_buffer));
4748 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
4749
4750 DBG("Consumer create trace chunk command: relay_id = %s"
28ab034a
JG
4751 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4752 ", chunk_creation_timestamp = %s",
4753 relayd_id_str,
4754 session_id,
4755 chunk_id,
4756 chunk_override_name ?: "(none)",
4757 creation_timestamp_str);
92816cc3
JG
4758
4759 /*
d2956687
JG
4760 * The trace chunk registry, as used by the consumer daemon, implicitly
4761 * owns the trace chunks. This is only needed in the consumer since
4762 * the consumer has no notion of a session beyond session IDs being
4763 * used to identify other objects.
4764 *
4765 * The lttng_trace_chunk_registry_publish() call below provides a
4766 * reference which is not released; it implicitly becomes the session
4767 * daemon's reference to the chunk in the consumer daemon.
4768 *
4769 * The lifetime of trace chunks in the consumer daemon is managed by
4770 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4771 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4772 */
cd9adb8b 4773 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
d2956687
JG
4774 if (!created_chunk) {
4775 ERR("Failed to create trace chunk");
4776 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4777 goto error;
d2956687 4778 }
92816cc3 4779
d2956687 4780 if (chunk_override_name) {
28ab034a 4781 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
d2956687
JG
4782 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4783 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4784 goto error;
92816cc3
JG
4785 }
4786 }
4787
d2956687 4788 if (chunk_directory_handle) {
28ab034a 4789 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
d2956687
JG
4790 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4791 ERR("Failed to set trace chunk credentials");
4792 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4793 goto error;
d2956687
JG
4794 }
4795 /*
4796 * The consumer daemon has no ownership of the chunk output
4797 * directory.
4798 */
28ab034a 4799 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
cd9adb8b 4800 chunk_directory_handle = nullptr;
d2956687
JG
4801 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4802 ERR("Failed to set trace chunk's directory handle");
4803 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4804 goto error;
92816cc3
JG
4805 }
4806 }
4807
d2956687 4808 published_chunk = lttng_trace_chunk_registry_publish_chunk(
28ab034a 4809 the_consumer_data.chunk_registry, session_id, created_chunk);
d2956687 4810 lttng_trace_chunk_put(created_chunk);
cd9adb8b 4811 created_chunk = nullptr;
d2956687
JG
4812 if (!published_chunk) {
4813 ERR("Failed to publish trace chunk");
4814 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4815 goto error;
d88744a4
JD
4816 }
4817
28ab034a 4818 {
56047f5a
JG
4819 lttng::urcu::read_lock_guard read_lock;
4820 cds_lfht_for_each_entry_duplicate(
4821 the_consumer_data.channels_by_session_id_ht->ht,
4822 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4823 lttng_ht_seed),
4824 the_consumer_data.channels_by_session_id_ht->match_fct,
4825 &session_id,
4826 &iter.iter,
4827 channel,
4828 channels_by_session_id_ht_node.node)
4829 {
4830 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4831 if (ret) {
4832 /*
4833 * Roll-back the creation of this chunk.
4834 *
4835 * This is important since the session daemon will
4836 * assume that the creation of this chunk failed and
4837 * will never ask for it to be closed, resulting
4838 * in a leak and an inconsistent state for some
4839 * channels.
4840 */
4841 enum lttcomm_return_code close_ret;
4842 char path[LTTNG_PATH_MAX];
4843
4844 DBG("Failed to set new trace chunk on existing channels, rolling back");
4845 close_ret =
4846 lttng_consumer_close_trace_chunk(relayd_id,
4847 session_id,
4848 chunk_id,
4849 chunk_creation_timestamp,
4850 nullptr,
4851 path);
4852 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4853 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4854 ", chunk_id = %" PRIu64,
4855 session_id,
4856 chunk_id);
4857 }
d2956687 4858
56047f5a
JG
4859 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4860 break;
d2956687 4861 }
d2956687 4862 }
a1ae2ea5
JD
4863 }
4864
e5add6d0
JG
4865 if (relayd_id) {
4866 struct consumer_relayd_sock_pair *relayd;
4867
4868 relayd = consumer_find_relayd(*relayd_id);
4869 if (relayd) {
4870 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4871 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
e5add6d0
JG
4872 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4873 } else {
4874 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4875 }
4876
4877 if (!relayd || ret) {
4878 enum lttcomm_return_code close_ret;
ecd1a12f 4879 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4880
4881 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
28ab034a
JG
4882 session_id,
4883 chunk_id,
4884 chunk_creation_timestamp,
cd9adb8b 4885 nullptr,
28ab034a 4886 path);
e5add6d0 4887 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
28ab034a
JG
4888 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4889 ", chunk_id = %" PRIu64,
4890 session_id,
4891 chunk_id);
e5add6d0
JG
4892 }
4893
4894 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4895 goto error_unlock;
e5add6d0
JG
4896 }
4897 }
7ea24db3 4898error_unlock:
7ea24db3 4899error:
d2956687
JG
4900 /* Release the reference returned by the "publish" operation. */
4901 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4902 lttng_trace_chunk_put(created_chunk);
d2956687 4903 return ret_code;
a1ae2ea5
JD
4904}
4905
28ab034a
JG
4906enum lttcomm_return_code
4907lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4908 uint64_t session_id,
4909 uint64_t chunk_id,
4910 time_t chunk_close_timestamp,
4911 const enum lttng_trace_chunk_command_type *close_command,
4912 char *path)
a1ae2ea5 4913{
d2956687
JG
4914 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4915 struct lttng_trace_chunk *chunk;
4916 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4917 const char *relayd_id_str = "(none)";
bbc4768c 4918 const char *close_command_name = "none";
d2956687
JG
4919 struct lttng_ht_iter iter;
4920 struct lttng_consumer_channel *channel;
4921 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4922
d2956687
JG
4923 if (relayd_id) {
4924 int ret;
4925
4926 /* Only used for logging purposes. */
28ab034a 4927 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4928 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4929 relayd_id_str = relayd_id_buffer;
4930 } else {
4931 relayd_id_str = "(formatting error)";
4932 }
bbc4768c
JG
4933 }
4934 if (close_command) {
28ab034a 4935 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
bbc4768c 4936 }
d2956687
JG
4937
4938 DBG("Consumer close trace chunk command: relayd_id = %s"
28ab034a
JG
4939 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4940 relayd_id_str,
4941 session_id,
4942 chunk_id,
4943 close_command_name);
bbc4768c 4944
d2956687 4945 chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 4946 the_consumer_data.chunk_registry, session_id, chunk_id);
bbc4768c 4947 if (!chunk) {
28ab034a
JG
4948 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4949 session_id,
4950 chunk_id);
d2956687 4951 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4952 goto end;
4953 }
4954
28ab034a 4955 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
d2956687
JG
4956 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4957 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4958 goto end;
45f1d9a1 4959 }
bbc4768c
JG
4960
4961 if (close_command) {
28ab034a 4962 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
bbc4768c
JG
4963 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4964 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4965 goto end;
4966 }
4967 }
a1ae2ea5 4968
d2956687
JG
4969 /*
4970 * chunk is now invalid to access as we no longer hold a reference to
4971 * it; it is only kept around to compare it (by address) to the
4972 * current chunk found in the session's channels.
4973 */
56047f5a
JG
4974 {
4975 lttng::urcu::read_lock_guard read_lock;
4976 cds_lfht_for_each_entry (
4977 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4978 int ret;
a1ae2ea5 4979
d2956687 4980 /*
56047f5a
JG
4981 * Only change the channel's chunk to NULL if it still
4982 * references the chunk being closed. The channel may
4983 * reference a newer channel in the case of a session
4984 * rotation. When a session rotation occurs, the "next"
4985 * chunk is created before the "current" chunk is closed.
d2956687 4986 */
56047f5a
JG
4987 if (channel->trace_chunk != chunk) {
4988 continue;
4989 }
4990 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4991 if (ret) {
4992 /*
4993 * Attempt to close the chunk on as many channels as
4994 * possible.
4995 */
4996 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4997 }
d2956687 4998 }
a1ae2ea5 4999 }
bbc4768c
JG
5000 if (relayd_id) {
5001 int ret;
5002 struct consumer_relayd_sock_pair *relayd;
5003
5004 relayd = consumer_find_relayd(*relayd_id);
5005 if (relayd) {
5006 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5007 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
bbc4768c
JG
5008 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5009 } else {
28ab034a 5010 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
bbc4768c
JG
5011 }
5012
5013 if (!relayd || ret) {
5014 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
5015 goto error_unlock;
5016 }
5017 }
5018error_unlock:
d2956687 5019end:
bbc4768c
JG
5020 /*
5021 * Release the reference returned by the "find" operation and
5022 * the session daemon's implicit reference to the chunk.
5023 */
5024 lttng_trace_chunk_put(chunk);
5025 lttng_trace_chunk_put(chunk);
5026
d2956687 5027 return ret_code;
a1ae2ea5 5028}
3654ed19 5029
28ab034a
JG
5030enum lttcomm_return_code
5031lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
3654ed19 5032{
c35f9726 5033 int ret;
d2956687 5034 enum lttcomm_return_code ret_code;
d2956687
JG
5035 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5036 const char *relayd_id_str = "(none)";
c35f9726 5037 const bool is_local_trace = !relayd_id;
cd9adb8b 5038 struct consumer_relayd_sock_pair *relayd = nullptr;
6b584c2e 5039 bool chunk_exists_local, chunk_exists_remote;
56047f5a 5040 lttng::urcu::read_lock_guard read_lock;
d2956687
JG
5041
5042 if (relayd_id) {
d2956687 5043 /* Only used for logging purposes. */
28ab034a 5044 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
5045 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5046 relayd_id_str = relayd_id_buffer;
5047 } else {
5048 relayd_id_str = "(formatting error)";
5049 }
d01ef216 5050 }
d2956687
JG
5051
5052 DBG("Consumer trace chunk exists command: relayd_id = %s"
28ab034a
JG
5053 ", chunk_id = %" PRIu64,
5054 relayd_id_str,
5055 chunk_id);
6b584c2e 5056 ret = lttng_trace_chunk_registry_chunk_exists(
28ab034a 5057 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
6b584c2e
JG
5058 if (ret) {
5059 /* Internal error. */
5060 ERR("Failed to query the existence of a trace chunk");
5061 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 5062 goto end;
6b584c2e 5063 }
28ab034a 5064 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
6b584c2e 5065 if (chunk_exists_local) {
c35f9726 5066 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
5067 goto end;
5068 } else if (is_local_trace) {
5069 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5070 goto end;
5071 }
5072
c35f9726
JG
5073 relayd = consumer_find_relayd(*relayd_id);
5074 if (!relayd) {
5075 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5076 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5077 goto end_rcu_unlock;
5078 }
5079 DBG("Looking up existence of trace chunk on relay daemon");
5080 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5081 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
c35f9726
JG
5082 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5083 if (ret < 0) {
5084 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5085 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5086 goto end_rcu_unlock;
5087 }
5088
28ab034a
JG
5089 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5090 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5091 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
d2956687 5092
c35f9726 5093end_rcu_unlock:
c35f9726 5094end:
d2956687 5095 return ret_code;
3654ed19 5096}
5f3aff8b 5097
28ab034a 5098static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
5099{
5100 struct lttng_ht *ht;
5101 struct lttng_consumer_stream *stream;
5102 struct lttng_ht_iter iter;
5103 int ret;
5104
fa29bfbf 5105 ht = the_consumer_data.stream_per_chan_id_ht;
5f3aff8b 5106
56047f5a 5107 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 5108 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
5109 ht->hash_fct(&channel->key, lttng_ht_seed),
5110 ht->match_fct,
5111 &channel->key,
5112 &iter.iter,
5113 stream,
5114 node_channel_id.node)
5115 {
5f3aff8b
MD
5116 /*
5117 * Protect against teardown with mutex.
5118 */
5119 pthread_mutex_lock(&stream->lock);
5120 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5121 goto next;
5122 }
5123 ret = consumer_clear_stream(stream);
5124 if (ret) {
5125 goto error_unlock;
5126 }
5127 next:
5128 pthread_mutex_unlock(&stream->lock);
5129 }
5f3aff8b
MD
5130 return LTTCOMM_CONSUMERD_SUCCESS;
5131
5132error_unlock:
5133 pthread_mutex_unlock(&stream->lock);
5f3aff8b
MD
5134 return ret;
5135}
5136
5137int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5138{
5139 int ret;
5140
5141 DBG("Consumer clear channel %" PRIu64, channel->key);
5142
5143 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5144 /*
5145 * Nothing to do for the metadata channel/stream.
5146 * Snapshot mechanism already take care of the metadata
5147 * handling/generation, and monitored channels only need to
5148 * have their data stream cleared..
5149 */
5150 ret = LTTCOMM_CONSUMERD_SUCCESS;
5151 goto end;
5152 }
5153
5154 if (!channel->monitor) {
5155 ret = consumer_clear_unmonitored_channel(channel);
5156 } else {
5157 ret = consumer_clear_monitored_channel(channel);
5158 }
5159end:
5160 return ret;
5161}
04ed9e10 5162
28ab034a 5163enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
04ed9e10
JG
5164{
5165 struct lttng_consumer_stream *stream;
5166 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5167
5168 if (channel->metadata_stream) {
5169 ERR("Open channel packets command attempted on a metadata channel");
5170 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5171 goto end;
5172 }
5173
56047f5a
JG
5174 {
5175 lttng::urcu::read_lock_guard read_lock;
5176 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5177 enum consumer_stream_open_packet_status status;
04ed9e10 5178
56047f5a
JG
5179 pthread_mutex_lock(&stream->lock);
5180 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5181 goto next;
5182 }
04ed9e10 5183
56047f5a
JG
5184 status = consumer_stream_open_packet(stream);
5185 switch (status) {
5186 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5187 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5188 ", channel name = %s, session id = %" PRIu64,
5189 stream->key,
5190 stream->chan->name,
5191 stream->chan->session_id);
5192 stream->opened_packet_in_current_trace_chunk = true;
5193 break;
5194 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5195 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5196 ", channel name = %s, session id = %" PRIu64,
5197 stream->key,
5198 stream->chan->name,
5199 stream->chan->session_id);
5200 break;
5201 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5202 /*
5203 * Only unexpected internal errors can lead to this
5204 * failing. Report an unknown error.
5205 */
5206 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5207 ", channel id = %" PRIu64 ", channel name = %s"
5208 ", session id = %" PRIu64,
5209 stream->key,
5210 channel->key,
5211 channel->name,
5212 channel->session_id);
5213 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5214 goto error_unlock;
5215 default:
5216 abort();
5217 }
04ed9e10 5218
56047f5a
JG
5219 next:
5220 pthread_mutex_unlock(&stream->lock);
5221 }
04ed9e10 5222 }
04ed9e10 5223end_rcu_unlock:
04ed9e10
JG
5224end:
5225 return ret;
5226
5227error_unlock:
5228 pthread_mutex_unlock(&stream->lock);
5229 goto end_rcu_unlock;
5230}
881fc67f
MD
5231
5232void lttng_consumer_sigbus_handle(void *addr)
5233{
5234 lttng_ustconsumer_sigbus_handle(addr);
5235}
This page took 0.44736 seconds and 4 git commands to generate.