Remove fcntl wrapper
[lttng-tools.git] / src / common / consumer / consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
c9e313bc
SM
11#include <common/align.hpp>
12#include <common/common.hpp>
13#include <common/compat/endian.hpp>
14#include <common/compat/poll.hpp>
15#include <common/consumer/consumer-metadata-cache.hpp>
16#include <common/consumer/consumer-stream.hpp>
17#include <common/consumer/consumer-testpoint.hpp>
18#include <common/consumer/consumer-timer.hpp>
19#include <common/consumer/consumer.hpp>
20#include <common/dynamic-array.hpp>
21#include <common/index/ctf-index.hpp>
22#include <common/index/index.hpp>
671e39d7 23#include <common/io-hint.hpp>
c9e313bc
SM
24#include <common/kernel-consumer/kernel-consumer.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
26#include <common/relayd/relayd.hpp>
27#include <common/sessiond-comm/relayd.hpp>
28#include <common/sessiond-comm/sessiond-comm.hpp>
29#include <common/string-utils/format.hpp>
30#include <common/time.hpp>
31#include <common/trace-chunk-registry.hpp>
32#include <common/trace-chunk.hpp>
56047f5a 33#include <common/urcu.hpp>
c9e313bc
SM
34#include <common/ust-consumer/ust-consumer.hpp>
35#include <common/utils.hpp>
3bd1e081 36
28ab034a 37#include <bin/lttng-consumerd/health-consumerd.hpp>
671e39d7 38#include <fcntl.h>
28ab034a
JG
39#include <inttypes.h>
40#include <poll.h>
41#include <pthread.h>
42#include <signal.h>
43#include <stdlib.h>
44#include <string.h>
45#include <sys/mman.h>
46#include <sys/socket.h>
47#include <sys/types.h>
48#include <unistd.h>
49
97535efa 50lttng_consumer_global_data the_consumer_data;
3bd1e081 51
d8ef542d
MD
52enum consumer_channel_action {
53 CONSUMER_CHANNEL_ADD,
a0cbdd2e 54 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
55 CONSUMER_CHANNEL_QUIT,
56};
57
f1494934 58namespace {
d8ef542d
MD
59struct consumer_channel_msg {
60 enum consumer_channel_action action;
28ab034a
JG
61 struct lttng_consumer_channel *chan; /* add */
62 uint64_t key; /* del */
d8ef542d
MD
63};
64
f1494934
JG
65/*
66 * Global hash table containing respectively metadata and data streams. The
67 * stream element in this ht should only be updated by the metadata poll thread
68 * for the metadata and the data poll thread for the data.
69 */
70struct lttng_ht *metadata_ht;
71struct lttng_ht *data_ht;
72} /* namespace */
73
80957876 74/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
75int data_consumption_paused;
76
3bd1e081
MD
77/*
78 * Flag to inform the polling thread to quit when all fd hung up. Updated by
79 * the consumer_thread_receive_fds when it notices that all fds has hung up.
80 * Also updated by the signal handler (consumer_should_exit()). Read by the
81 * polling threads.
82 */
10211f5c 83int consumer_quit;
3bd1e081 84
cd9adb8b 85static const char *get_consumer_domain()
5da88b0f 86{
fa29bfbf 87 switch (the_consumer_data.type) {
5da88b0f
MD
88 case LTTNG_CONSUMER_KERNEL:
89 return DEFAULT_KERNEL_TRACE_DIR;
90 case LTTNG_CONSUMER64_UST:
91 /* Fall-through. */
92 case LTTNG_CONSUMER32_UST:
93 return DEFAULT_UST_TRACE_DIR;
94 default:
95 abort();
96 }
97}
98
acdb9057
DG
99/*
100 * Notify a thread lttng pipe to poll back again. This usually means that some
101 * global state has changed so we just send back the thread in a poll wait
102 * call.
103 */
104static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
105{
cd9adb8b 106 struct lttng_consumer_stream *null_stream = nullptr;
acdb9057 107
a0377dfe 108 LTTNG_ASSERT(pipe);
acdb9057 109
5c7248cd
JG
110 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
111 pointer. */
acdb9057
DG
112}
113
5c635c72
MD
114static void notify_health_quit_pipe(int *pipe)
115{
6cd525e8 116 ssize_t ret;
5c635c72 117
6cd525e8
MD
118 ret = lttng_write(pipe[1], "4", 1);
119 if (ret < 1) {
5c635c72
MD
120 PERROR("write consumer health quit");
121 }
122}
123
d8ef542d 124static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
125 struct lttng_consumer_channel *chan,
126 uint64_t key,
127 enum consumer_channel_action action)
d8ef542d
MD
128{
129 struct consumer_channel_msg msg;
6cd525e8 130 ssize_t ret;
d8ef542d 131
e56251fc
DG
132 memset(&msg, 0, sizeof(msg));
133
d8ef542d
MD
134 msg.action = action;
135 msg.chan = chan;
f21dae48 136 msg.key = key;
6cd525e8
MD
137 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
138 if (ret < sizeof(msg)) {
139 PERROR("notify_channel_pipe write error");
140 }
d8ef542d
MD
141}
142
28ab034a 143void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
a0cbdd2e 144{
cd9adb8b 145 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
a0cbdd2e
MD
146}
147
d8ef542d 148static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
28ab034a
JG
149 struct lttng_consumer_channel **chan,
150 uint64_t *key,
151 enum consumer_channel_action *action)
d8ef542d
MD
152{
153 struct consumer_channel_msg msg;
6cd525e8 154 ssize_t ret;
d8ef542d 155
6cd525e8
MD
156 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
157 if (ret < sizeof(msg)) {
158 ret = -1;
159 goto error;
d8ef542d 160 }
6cd525e8
MD
161 *action = msg.action;
162 *chan = msg.chan;
163 *key = msg.key;
164error:
165 return (int) ret;
d8ef542d
MD
166}
167
212d67a2
DG
168/*
169 * Cleanup the stream list of a channel. Those streams are not yet globally
170 * visible
171 */
172static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
173{
174 struct lttng_consumer_stream *stream, *stmp;
175
a0377dfe 176 LTTNG_ASSERT(channel);
212d67a2
DG
177
178 /* Delete streams that might have been left in the stream list. */
28ab034a 179 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
212d67a2
DG
180 /*
181 * Once a stream is added to this list, the buffers were created so we
182 * have a guarantee that this call will succeed. Setting the monitor
183 * mode to 0 so we don't lock nor try to delete the stream from the
184 * global hash table.
185 */
186 stream->monitor = 0;
cd9adb8b 187 consumer_stream_destroy(stream, nullptr);
212d67a2
DG
188 }
189}
190
3bd1e081
MD
191/*
192 * Find a stream. The consumer_data.lock must be locked during this
193 * call.
194 */
28ab034a 195static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
3bd1e081 196{
e4421fec 197 struct lttng_ht_iter iter;
d88aee68 198 struct lttng_ht_node_u64 *node;
cd9adb8b 199 struct lttng_consumer_stream *stream = nullptr;
3bd1e081 200
a0377dfe 201 LTTNG_ASSERT(ht);
8389e4f8 202
d88aee68
DG
203 /* -1ULL keys are lookup failures */
204 if (key == (uint64_t) -1ULL) {
cd9adb8b 205 return nullptr;
7a57cf92 206 }
e4421fec 207
56047f5a 208 lttng::urcu::read_lock_guard read_lock;
6065ceec 209
d88aee68
DG
210 lttng_ht_lookup(ht, &key, &iter);
211 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 212 if (node != nullptr) {
0114db0e 213 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
3bd1e081 214 }
e4421fec
DG
215
216 return stream;
3bd1e081
MD
217}
218
da009f2c 219static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
220{
221 struct lttng_consumer_stream *stream;
222
56047f5a 223 lttng::urcu::read_lock_guard read_lock;
ffe60014 224 stream = find_stream(key, ht);
04253271 225 if (stream) {
da009f2c 226 stream->key = (uint64_t) -1ULL;
04253271
MD
227 /*
228 * We don't want the lookup to match, but we still need
229 * to iterate on this stream when iterating over the hash table. Just
230 * change the node key.
231 */
da009f2c 232 stream->node.key = (uint64_t) -1ULL;
04253271 233 }
7ad0a0cb
MD
234}
235
d56db448
DG
236/*
237 * Return a channel object for the given key.
238 *
239 * RCU read side lock MUST be acquired before calling this function and
240 * protects the channel ptr.
241 */
d88aee68 242struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 243{
e4421fec 244 struct lttng_ht_iter iter;
d88aee68 245 struct lttng_ht_node_u64 *node;
cd9adb8b 246 struct lttng_consumer_channel *channel = nullptr;
3bd1e081 247
48b7cdc2
FD
248 ASSERT_RCU_READ_LOCKED();
249
d88aee68
DG
250 /* -1ULL keys are lookup failures */
251 if (key == (uint64_t) -1ULL) {
cd9adb8b 252 return nullptr;
7a57cf92 253 }
e4421fec 254
fa29bfbf 255 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
d88aee68 256 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 257 if (node != nullptr) {
0114db0e 258 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
3bd1e081 259 }
e4421fec
DG
260
261 return channel;
3bd1e081
MD
262}
263
b5a6470f
DG
264/*
265 * There is a possibility that the consumer does not have enough time between
266 * the close of the channel on the session daemon and the cleanup in here thus
267 * once we have a channel add with an existing key, we know for sure that this
268 * channel will eventually get cleaned up by all streams being closed.
269 *
270 * This function just nullifies the already existing channel key.
271 */
272static void steal_channel_key(uint64_t key)
273{
274 struct lttng_consumer_channel *channel;
275
56047f5a 276 lttng::urcu::read_lock_guard read_lock;
b5a6470f
DG
277 channel = consumer_find_channel(key);
278 if (channel) {
279 channel->key = (uint64_t) -1ULL;
280 /*
281 * We don't want the lookup to match, but we still need to iterate on
282 * this channel when iterating over the hash table. Just change the
283 * node key.
284 */
285 channel->node.key = (uint64_t) -1ULL;
286 }
b5a6470f
DG
287}
288
ffe60014 289static void free_channel_rcu(struct rcu_head *head)
702b1ea4 290{
28ab034a 291 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
ffe60014 292 struct lttng_consumer_channel *channel =
0114db0e 293 lttng::utils::container_of(node, &lttng_consumer_channel::node);
702b1ea4 294
fa29bfbf 295 switch (the_consumer_data.type) {
b83e03c4
MD
296 case LTTNG_CONSUMER_KERNEL:
297 break;
298 case LTTNG_CONSUMER32_UST:
299 case LTTNG_CONSUMER64_UST:
300 lttng_ustconsumer_free_channel(channel);
301 break;
302 default:
303 ERR("Unknown consumer_data type");
304 abort();
305 }
ffe60014 306 free(channel);
702b1ea4
MD
307}
308
00e2e675
DG
309/*
310 * RCU protected relayd socket pair free.
311 */
ffe60014 312static void free_relayd_rcu(struct rcu_head *head)
00e2e675 313{
28ab034a 314 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
00e2e675 315 struct consumer_relayd_sock_pair *relayd =
0114db0e 316 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675 317
8994307f
DG
318 /*
319 * Close all sockets. This is done in the call RCU since we don't want the
320 * socket fds to be reassigned thus potentially creating bad state of the
321 * relayd object.
322 *
323 * We do not have to lock the control socket mutex here since at this stage
324 * there is no one referencing to this relayd object.
325 */
326 (void) relayd_close(&relayd->control_sock);
327 (void) relayd_close(&relayd->data_sock);
328
3a84e2f3 329 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
330 free(relayd);
331}
332
333/*
334 * Destroy and free relayd socket pair object.
00e2e675 335 */
51230d70 336void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
337{
338 int ret;
339 struct lttng_ht_iter iter;
340
cd9adb8b 341 if (relayd == nullptr) {
173af62f
DG
342 return;
343 }
344
00e2e675
DG
345 DBG("Consumer destroy and close relayd socket pair");
346
347 iter.iter.node = &relayd->node.node;
fa29bfbf 348 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
173af62f 349 if (ret != 0) {
8994307f 350 /* We assume the relayd is being or is destroyed */
173af62f
DG
351 return;
352 }
00e2e675 353
00e2e675 354 /* RCU free() call */
ffe60014
DG
355 call_rcu(&relayd->node.head, free_relayd_rcu);
356}
357
358/*
359 * Remove a channel from the global list protected by a mutex. This function is
360 * also responsible for freeing its data structures.
361 */
362void consumer_del_channel(struct lttng_consumer_channel *channel)
363{
ffe60014
DG
364 struct lttng_ht_iter iter;
365
d88aee68 366 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014 367
fa29bfbf 368 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 369 pthread_mutex_lock(&channel->lock);
ffe60014 370
212d67a2
DG
371 /* Destroy streams that might have been left in the stream list. */
372 clean_channel_stream_list(channel);
51e762e5 373
d3e2ba59
JD
374 if (channel->live_timer_enabled == 1) {
375 consumer_timer_live_stop(channel);
376 }
e9404c27
JG
377 if (channel->monitor_timer_enabled == 1) {
378 consumer_timer_monitor_stop(channel);
379 }
d3e2ba59 380
319dcddc
JG
381 /*
382 * Send a last buffer statistics sample to the session daemon
383 * to ensure it tracks the amount of data consumed by this channel.
384 */
385 sample_and_send_channel_buffer_stats(channel);
386
fa29bfbf 387 switch (the_consumer_data.type) {
ffe60014
DG
388 case LTTNG_CONSUMER_KERNEL:
389 break;
390 case LTTNG_CONSUMER32_UST:
391 case LTTNG_CONSUMER64_UST:
392 lttng_ustconsumer_del_channel(channel);
393 break;
394 default:
395 ERR("Unknown consumer_data type");
a0377dfe 396 abort();
ffe60014
DG
397 goto end;
398 }
399
d2956687 400 lttng_trace_chunk_put(channel->trace_chunk);
cd9adb8b 401 channel->trace_chunk = nullptr;
5c3892a6 402
d2956687
JG
403 if (channel->is_published) {
404 int ret;
405
56047f5a 406 lttng::urcu::read_lock_guard read_lock;
d2956687 407 iter.iter.node = &channel->node.node;
fa29bfbf 408 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
a0377dfe 409 LTTNG_ASSERT(!ret);
ffe60014 410
d2956687 411 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
28ab034a 412 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
a0377dfe 413 LTTNG_ASSERT(!ret);
d2956687
JG
414 }
415
b6921a17
JG
416 channel->is_deleted = true;
417 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 418end:
a9838785 419 pthread_mutex_unlock(&channel->lock);
fa29bfbf 420 pthread_mutex_unlock(&the_consumer_data.lock);
00e2e675
DG
421}
422
228b5bf7
DG
423/*
424 * Iterate over the relayd hash table and destroy each element. Finally,
425 * destroy the whole hash table.
426 */
cd9adb8b 427static void cleanup_relayd_ht()
228b5bf7
DG
428{
429 struct lttng_ht_iter iter;
430 struct consumer_relayd_sock_pair *relayd;
431
56047f5a
JG
432 {
433 lttng::urcu::read_lock_guard read_lock;
228b5bf7 434
56047f5a
JG
435 cds_lfht_for_each_entry (
436 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
437 consumer_destroy_relayd(relayd);
438 }
228b5bf7
DG
439 }
440
fa29bfbf 441 lttng_ht_destroy(the_consumer_data.relayd_ht);
228b5bf7
DG
442}
443
8994307f
DG
444/*
445 * Update the end point status of all streams having the given network sequence
446 * index (relayd index).
447 *
448 * It's atomically set without having the stream mutex locked which is fine
449 * because we handle the write/read race with a pipe wakeup for each thread.
450 */
da009f2c 451static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
28ab034a 452 enum consumer_endpoint_status status)
8994307f
DG
453{
454 struct lttng_ht_iter iter;
455 struct lttng_consumer_stream *stream;
456
da009f2c 457 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f 458
56047f5a 459 lttng::urcu::read_lock_guard read_lock;
8994307f
DG
460
461 /* Let's begin with metadata */
28ab034a 462 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
463 if (stream->net_seq_idx == net_seq_idx) {
464 uatomic_set(&stream->endpoint_status, status);
465 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
466 }
467 }
468
469 /* Follow up by the data streams */
28ab034a 470 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
471 if (stream->net_seq_idx == net_seq_idx) {
472 uatomic_set(&stream->endpoint_status, status);
473 DBG("Delete flag set to data stream %d", stream->wait_fd);
474 }
475 }
8994307f
DG
476}
477
478/*
479 * Cleanup a relayd object by flagging every associated streams for deletion,
480 * destroying the object meaning removing it from the relayd hash table,
481 * closing the sockets and freeing the memory in a RCU call.
482 *
483 * If a local data context is available, notify the threads that the streams'
484 * state have changed.
485 */
9276e5c8 486void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 487{
da009f2c 488 uint64_t netidx;
8994307f 489
a0377dfe 490 LTTNG_ASSERT(relayd);
8994307f 491
97535efa 492 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
9617607b 493
8994307f
DG
494 /* Save the net sequence index before destroying the object */
495 netidx = relayd->net_seq_idx;
496
497 /*
498 * Delete the relayd from the relayd hash table, close the sockets and free
499 * the object in a RCU call.
500 */
51230d70 501 consumer_destroy_relayd(relayd);
8994307f
DG
502
503 /* Set inactive endpoint to all streams */
504 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
505
506 /*
507 * With a local data context, notify the threads that the streams' state
508 * have changed. The write() action on the pipe acts as an "implicit"
509 * memory barrier ordering the updates of the end point status from the
510 * read of this status which happens AFTER receiving this notify.
511 */
9276e5c8
JR
512 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
513 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
514}
515
a6ba4fe1
DG
516/*
517 * Flag a relayd socket pair for destruction. Destroy it if the refcount
518 * reaches zero.
519 *
520 * RCU read side lock MUST be aquired before calling this function.
521 */
522void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
523{
a0377dfe 524 LTTNG_ASSERT(relayd);
48b7cdc2 525 ASSERT_RCU_READ_LOCKED();
a6ba4fe1
DG
526
527 /* Set destroy flag for this object */
528 uatomic_set(&relayd->destroy_flag, 1);
529
530 /* Destroy the relayd if refcount is 0 */
531 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 532 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
533 }
534}
535
3bd1e081 536/*
1d1a276c
DG
537 * Completly destroy stream from every visiable data structure and the given
538 * hash table if one.
539 *
540 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 541 */
28ab034a 542void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
3bd1e081 543{
1d1a276c 544 consumer_stream_destroy(stream, ht);
3bd1e081
MD
545}
546
5ab66908
MD
547/*
548 * XXX naming of del vs destroy is all mixed up.
549 */
550void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
551{
552 consumer_stream_destroy(stream, data_ht);
553}
554
555void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
556{
557 consumer_stream_destroy(stream, metadata_ht);
558}
559
28ab034a
JG
560void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
561 struct lttng_consumer_channel *channel)
d9a2e16e 562{
28ab034a 563 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
d9a2e16e
JD
564}
565
3bd1e081
MD
566/*
567 * Add a stream to the global list protected by a mutex.
568 */
66d583dc 569void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 570{
5ab66908 571 struct lttng_ht *ht = data_ht;
3bd1e081 572
a0377dfe
FD
573 LTTNG_ASSERT(stream);
574 LTTNG_ASSERT(ht);
c77fc10a 575
d88aee68 576 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5 577
fa29bfbf 578 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 579 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 580 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 581 pthread_mutex_lock(&stream->lock);
56047f5a 582 lttng::urcu::read_lock_guard read_lock;
e316aad5 583
43c34bc3 584 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 585 steal_stream_key(stream->key, ht);
43c34bc3 586
d88aee68 587 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 588
28ab034a 589 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 590
ca22feea
DG
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
28ab034a 596 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 597
e316aad5 598 /*
ffe60014
DG
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
e316aad5
DG
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
ffe60014 605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
ffe60014 608 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
609 }
610
611 /* Update consumer data once the node is inserted. */
fa29bfbf
SM
612 the_consumer_data.stream_count++;
613 the_consumer_data.need_update = 1;
3bd1e081 614
2e818a6a 615 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 616 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 617 pthread_mutex_unlock(&stream->chan->lock);
fa29bfbf 618 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
619}
620
00e2e675 621/*
3f8e211f
DG
622 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
623 * be acquired before calling this.
00e2e675 624 */
d09e1200 625static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
626{
627 int ret = 0;
d88aee68 628 struct lttng_ht_node_u64 *node;
00e2e675
DG
629 struct lttng_ht_iter iter;
630
a0377dfe 631 LTTNG_ASSERT(relayd);
48b7cdc2 632 ASSERT_RCU_READ_LOCKED();
00e2e675 633
28ab034a 634 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
d88aee68 635 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 636 if (node != nullptr) {
00e2e675
DG
637 goto end;
638 }
fa29bfbf 639 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
00e2e675 640
00e2e675
DG
641end:
642 return ret;
643}
644
645/*
646 * Allocate and return a consumer relayd socket.
647 */
28ab034a 648static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
00e2e675 649{
cd9adb8b 650 struct consumer_relayd_sock_pair *obj = nullptr;
00e2e675 651
da009f2c
MD
652 /* net sequence index of -1 is a failure */
653 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
654 goto error;
655 }
656
64803277 657 obj = zmalloc<consumer_relayd_sock_pair>();
cd9adb8b 658 if (obj == nullptr) {
00e2e675
DG
659 PERROR("zmalloc relayd sock");
660 goto error;
661 }
662
663 obj->net_seq_idx = net_seq_idx;
664 obj->refcount = 0;
173af62f 665 obj->destroy_flag = 0;
f96e4545
MD
666 obj->control_sock.sock.fd = -1;
667 obj->data_sock.sock.fd = -1;
d88aee68 668 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
cd9adb8b 669 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
00e2e675
DG
670
671error:
672 return obj;
673}
674
675/*
676 * Find a relayd socket pair in the global consumer data.
677 *
678 * Return the object if found else NULL.
b0b335c8
MD
679 * RCU read-side lock must be held across this call and while using the
680 * returned object.
00e2e675 681 */
d88aee68 682struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
683{
684 struct lttng_ht_iter iter;
d88aee68 685 struct lttng_ht_node_u64 *node;
cd9adb8b 686 struct consumer_relayd_sock_pair *relayd = nullptr;
00e2e675 687
48b7cdc2
FD
688 ASSERT_RCU_READ_LOCKED();
689
00e2e675 690 /* Negative keys are lookup failures */
d88aee68 691 if (key == (uint64_t) -1ULL) {
00e2e675
DG
692 goto error;
693 }
694
fa29bfbf 695 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
d88aee68 696 node = lttng_ht_iter_get_node_u64(&iter);
cd9adb8b 697 if (node != nullptr) {
0114db0e 698 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
00e2e675
DG
699 }
700
00e2e675
DG
701error:
702 return relayd;
703}
704
10a50311
JD
705/*
706 * Find a relayd and send the stream
707 *
708 * Returns 0 on success, < 0 on error
709 */
28ab034a 710int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
10a50311
JD
711{
712 int ret = 0;
713 struct consumer_relayd_sock_pair *relayd;
714
a0377dfe
FD
715 LTTNG_ASSERT(stream);
716 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
717 LTTNG_ASSERT(path);
10a50311
JD
718
719 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 720 lttng::urcu::read_lock_guard read_lock;
10a50311 721 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 722 if (relayd != nullptr) {
10a50311
JD
723 /* Add stream on the relayd */
724 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
725 ret = relayd_add_stream(&relayd->control_sock,
726 stream->name,
727 get_consumer_domain(),
728 path,
729 &stream->relayd_stream_id,
730 stream->chan->tracefile_size,
731 stream->chan->tracefile_count,
732 stream->trace_chunk);
10a50311
JD
733 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
734 if (ret < 0) {
28ab034a
JG
735 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
736 relayd->net_seq_idx);
9276e5c8 737 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
738 goto end;
739 }
1c20f0e2 740
10a50311 741 uatomic_inc(&relayd->refcount);
d01178b6 742 stream->sent_to_relayd = 1;
10a50311
JD
743 } else {
744 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
28ab034a
JG
745 stream->key,
746 stream->net_seq_idx);
10a50311
JD
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
28ab034a
JG
752 stream->name,
753 stream->key,
754 stream->net_seq_idx);
10a50311
JD
755
756end:
10a50311
JD
757 return ret;
758}
759
a4baae1b
JD
760/*
761 * Find a relayd and send the streams sent message
762 *
763 * Returns 0 on success, < 0 on error
764 */
765int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
766{
767 int ret = 0;
768 struct consumer_relayd_sock_pair *relayd;
769
a0377dfe 770 LTTNG_ASSERT(net_seq_idx != -1ULL);
a4baae1b
JD
771
772 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 773 lttng::urcu::read_lock_guard read_lock;
a4baae1b 774 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 775 if (relayd != nullptr) {
a4baae1b
JD
776 /* Add stream on the relayd */
777 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
778 ret = relayd_streams_sent(&relayd->control_sock);
779 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
780 if (ret < 0) {
28ab034a
JG
781 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
782 relayd->net_seq_idx);
9276e5c8 783 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
784 goto end;
785 }
786 } else {
28ab034a 787 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
a4baae1b
JD
788 ret = -1;
789 goto end;
790 }
791
792 ret = 0;
793 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
794
795end:
a4baae1b
JD
796 return ret;
797}
798
10a50311
JD
799/*
800 * Find a relayd and close the stream
801 */
802void close_relayd_stream(struct lttng_consumer_stream *stream)
803{
804 struct consumer_relayd_sock_pair *relayd;
805
806 /* The stream is not metadata. Get relayd reference if exists. */
56047f5a 807 lttng::urcu::read_lock_guard read_lock;
10a50311
JD
808 relayd = consumer_find_relayd(stream->net_seq_idx);
809 if (relayd) {
810 consumer_stream_relayd_close(stream, relayd);
811 }
10a50311
JD
812}
813
00e2e675
DG
814/*
815 * Handle stream for relayd transmission if the stream applies for network
816 * streaming where the net sequence index is set.
817 *
818 * Return destination file descriptor or negative value on error.
819 */
6197aea7 820static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
28ab034a
JG
821 size_t data_size,
822 unsigned long padding,
823 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
824{
825 int outfd = -1, ret;
00e2e675
DG
826 struct lttcomm_relayd_data_hdr data_hdr;
827
828 /* Safety net */
a0377dfe
FD
829 LTTNG_ASSERT(stream);
830 LTTNG_ASSERT(relayd);
00e2e675
DG
831
832 /* Reset data header */
833 memset(&data_hdr, 0, sizeof(data_hdr));
834
00e2e675
DG
835 if (stream->metadata_flag) {
836 /* Caller MUST acquire the relayd control socket lock */
837 ret = relayd_send_metadata(&relayd->control_sock, data_size);
838 if (ret < 0) {
839 goto error;
840 }
841
842 /* Metadata are always sent on the control socket. */
6151a90f 843 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
844 } else {
845 /* Set header with stream information */
846 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
847 data_hdr.data_size = htobe32(data_size);
1d4dfdef 848 data_hdr.padding_size = htobe32(padding);
c35f9726 849
39df6d9f
DG
850 /*
851 * Note that net_seq_num below is assigned with the *current* value of
852 * next_net_seq_num and only after that the next_net_seq_num will be
853 * increment. This is why when issuing a command on the relayd using
854 * this next value, 1 should always be substracted in order to compare
855 * the last seen sequence number on the relayd side to the last sent.
856 */
3604f373 857 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
858 /* Other fields are zeroed previously */
859
28ab034a 860 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
00e2e675
DG
861 if (ret < 0) {
862 goto error;
863 }
864
3604f373
DG
865 ++stream->next_net_seq_num;
866
00e2e675 867 /* Set to go on data socket */
6151a90f 868 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
869 }
870
871error:
872 return outfd;
873}
874
b1316da1
JG
875/*
876 * Write a character on the metadata poll pipe to wake the metadata thread.
877 * Returns 0 on success, -1 on error.
878 */
879int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
880{
881 int ret = 0;
882
28ab034a 883 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
b1316da1
JG
884 if (channel->monitor && channel->metadata_stream) {
885 const char dummy = 'c';
28ab034a
JG
886 const ssize_t write_ret =
887 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
b1316da1
JG
888
889 if (write_ret < 1) {
890 if (errno == EWOULDBLOCK) {
891 /*
892 * This is fine, the metadata poll thread
893 * is having a hard time keeping-up, but
894 * it will eventually wake-up and consume
895 * the available data.
896 */
897 ret = 0;
898 } else {
899 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
900 ret = -1;
901 goto end;
902 }
903 }
904 }
905
906end:
907 return ret;
908}
909
d2956687
JG
910/*
911 * Trigger a dump of the metadata content. Following/during the succesful
912 * completion of this call, the metadata poll thread will start receiving
913 * metadata packets to consume.
914 *
915 * The caller must hold the channel and stream locks.
916 */
28ab034a 917static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
d2956687
JG
918{
919 int ret;
920
921 ASSERT_LOCKED(stream->chan->lock);
922 ASSERT_LOCKED(stream->lock);
a0377dfe
FD
923 LTTNG_ASSERT(stream->metadata_flag);
924 LTTNG_ASSERT(stream->chan->trace_chunk);
d2956687 925
fa29bfbf 926 switch (the_consumer_data.type) {
d2956687
JG
927 case LTTNG_CONSUMER_KERNEL:
928 /*
929 * Reset the position of what has been read from the
930 * metadata cache to 0 so we can dump it again.
931 */
932 ret = kernctl_metadata_cache_dump(stream->wait_fd);
933 break;
934 case LTTNG_CONSUMER32_UST:
935 case LTTNG_CONSUMER64_UST:
936 /*
937 * Reset the position pushed from the metadata cache so it
938 * will write from the beginning on the next push.
939 */
940 stream->ust_metadata_pushed = 0;
941 ret = consumer_metadata_wakeup_pipe(stream->chan);
942 break;
943 default:
944 ERR("Unknown consumer_data type");
945 abort();
946 }
947 if (ret < 0) {
948 ERR("Failed to dump the metadata cache");
949 }
950 return ret;
951}
952
28ab034a
JG
953static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
954 struct lttng_trace_chunk *new_trace_chunk)
d2956687 955{
d2956687 956 pthread_mutex_lock(&channel->lock);
b6921a17
JG
957 if (channel->is_deleted) {
958 /*
959 * The channel has been logically deleted and should no longer
960 * be used. It has released its reference to its current trace
961 * chunk and should not acquire a new one.
962 *
963 * Return success as there is nothing for the caller to do.
964 */
965 goto end;
966 }
d2956687
JG
967
968 /*
969 * The acquisition of the reference cannot fail (barring
970 * a severe internal error) since a reference to the published
971 * chunk is already held by the caller.
972 */
973 if (new_trace_chunk) {
28ab034a 974 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
d2956687 975
a0377dfe 976 LTTNG_ASSERT(acquired_reference);
d2956687
JG
977 }
978
979 lttng_trace_chunk_put(channel->trace_chunk);
980 channel->trace_chunk = new_trace_chunk;
d2956687
JG
981end:
982 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 983 return 0;
d2956687
JG
984}
985
3bd1e081 986/*
ffe60014
DG
987 * Allocate and return a new lttng_consumer_channel object using the given key
988 * to initialize the hash table node.
989 *
990 * On error, return NULL.
3bd1e081 991 */
886224ff 992struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
28ab034a
JG
993 uint64_t session_id,
994 const uint64_t *chunk_id,
995 const char *pathname,
996 const char *name,
997 uint64_t relayd_id,
998 enum lttng_event_output output,
999 uint64_t tracefile_size,
1000 uint64_t tracefile_count,
1001 uint64_t session_id_per_pid,
1002 unsigned int monitor,
1003 unsigned int live_timer_interval,
1004 bool is_in_live_session,
1005 const char *root_shm_path,
1006 const char *shm_path)
3bd1e081 1007{
cd9adb8b
JG
1008 struct lttng_consumer_channel *channel = nullptr;
1009 struct lttng_trace_chunk *trace_chunk = nullptr;
d2956687
JG
1010
1011 if (chunk_id) {
1012 trace_chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 1013 the_consumer_data.chunk_registry, session_id, *chunk_id);
d2956687
JG
1014 if (!trace_chunk) {
1015 ERR("Failed to find trace chunk reference during creation of channel");
1016 goto end;
1017 }
1018 }
3bd1e081 1019
64803277 1020 channel = zmalloc<lttng_consumer_channel>();
cd9adb8b 1021 if (channel == nullptr) {
7a57cf92 1022 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
1023 goto end;
1024 }
ffe60014
DG
1025
1026 channel->key = key;
3bd1e081 1027 channel->refcount = 0;
ffe60014 1028 channel->session_id = session_id;
1950109e 1029 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1030 channel->relayd_id = relayd_id;
1624d5b7
JD
1031 channel->tracefile_size = tracefile_size;
1032 channel->tracefile_count = tracefile_count;
2bba9e53 1033 channel->monitor = monitor;
ecc48a90 1034 channel->live_timer_interval = live_timer_interval;
a2814ea7 1035 channel->is_live = is_in_live_session;
cd9adb8b
JG
1036 pthread_mutex_init(&channel->lock, nullptr);
1037 pthread_mutex_init(&channel->timer_lock, nullptr);
ffe60014 1038
0c759fc9
DG
1039 switch (output) {
1040 case LTTNG_EVENT_SPLICE:
1041 channel->output = CONSUMER_CHANNEL_SPLICE;
1042 break;
1043 case LTTNG_EVENT_MMAP:
1044 channel->output = CONSUMER_CHANNEL_MMAP;
1045 break;
1046 default:
a0377dfe 1047 abort();
0c759fc9 1048 free(channel);
cd9adb8b 1049 channel = nullptr;
0c759fc9
DG
1050 goto end;
1051 }
1052
07b86b52
JD
1053 /*
1054 * In monitor mode, the streams associated with the channel will be put in
1055 * a special list ONLY owned by this channel. So, the refcount is set to 1
1056 * here meaning that the channel itself has streams that are referenced.
1057 *
1058 * On a channel deletion, once the channel is no longer visible, the
1059 * refcount is decremented and checked for a zero value to delete it. With
1060 * streams in no monitor mode, it will now be safe to destroy the channel.
1061 */
1062 if (!channel->monitor) {
1063 channel->refcount = 1;
1064 }
1065
ffe60014
DG
1066 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1067 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1068
1069 strncpy(channel->name, name, sizeof(channel->name));
1070 channel->name[sizeof(channel->name) - 1] = '\0';
1071
3d071855
MD
1072 if (root_shm_path) {
1073 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1074 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1075 }
d7ba1388
MD
1076 if (shm_path) {
1077 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1078 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1079 }
1080
d88aee68 1081 lttng_ht_node_init_u64(&channel->node, channel->key);
28ab034a 1082 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
d8ef542d
MD
1083
1084 channel->wait_fd = -1;
ffe60014
DG
1085 CDS_INIT_LIST_HEAD(&channel->streams.head);
1086
d2956687 1087 if (trace_chunk) {
28ab034a 1088 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
d2956687
JG
1089 if (ret) {
1090 goto error;
1091 }
1092 }
1093
62a7b8ed 1094 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1095
3bd1e081 1096end:
d2956687 1097 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1098 return channel;
d2956687
JG
1099error:
1100 consumer_del_channel(channel);
cd9adb8b 1101 channel = nullptr;
d2956687 1102 goto end;
3bd1e081
MD
1103}
1104
1105/*
1106 * Add a channel to the global list protected by a mutex.
821fffb2 1107 *
b5a6470f 1108 * Always return 0 indicating success.
3bd1e081 1109 */
d8ef542d 1110int consumer_add_channel(struct lttng_consumer_channel *channel,
28ab034a 1111 struct lttng_consumer_local_data *ctx)
3bd1e081 1112{
fa29bfbf 1113 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 1114 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1115 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1116
b5a6470f
DG
1117 /*
1118 * This gives us a guarantee that the channel we are about to add to the
1119 * channel hash table will be unique. See this function comment on the why
1120 * we need to steel the channel key at this stage.
1121 */
1122 steal_channel_key(channel->key);
c77fc10a 1123
56047f5a 1124 lttng::urcu::read_lock_guard read_lock;
fa29bfbf
SM
1125 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1126 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
28ab034a 1127 &channel->channels_by_session_id_ht_node);
d2956687 1128 channel->is_published = true;
b5a6470f 1129
ec6ea7d0 1130 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1131 pthread_mutex_unlock(&channel->lock);
fa29bfbf 1132 pthread_mutex_unlock(&the_consumer_data.lock);
702b1ea4 1133
b5a6470f 1134 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1135 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1136 }
b5a6470f
DG
1137
1138 return 0;
3bd1e081
MD
1139}
1140
1141/*
1142 * Allocate the pollfd structure and the local view of the out fds to avoid
1143 * doing a lookup in the linked list and concurrency issues when writing is
1144 * needed. Called with consumer_data.lock held.
1145 *
1146 * Returns the number of fds in the structures.
1147 */
ffe60014 1148static int update_poll_array(struct lttng_consumer_local_data *ctx,
28ab034a
JG
1149 struct pollfd **pollfd,
1150 struct lttng_consumer_stream **local_stream,
1151 struct lttng_ht *ht,
1152 int *nb_inactive_fd)
3bd1e081 1153{
3bd1e081 1154 int i = 0;
e4421fec
DG
1155 struct lttng_ht_iter iter;
1156 struct lttng_consumer_stream *stream;
3bd1e081 1157
a0377dfe
FD
1158 LTTNG_ASSERT(ctx);
1159 LTTNG_ASSERT(ht);
1160 LTTNG_ASSERT(pollfd);
1161 LTTNG_ASSERT(local_stream);
ffe60014 1162
3bd1e081 1163 DBG("Updating poll fd array");
9a2fcf78 1164 *nb_inactive_fd = 0;
56047f5a
JG
1165
1166 {
1167 lttng::urcu::read_lock_guard read_lock;
1168 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1169 /*
1170 * Only active streams with an active end point can be added to the
1171 * poll set and local stream storage of the thread.
1172 *
1173 * There is a potential race here for endpoint_status to be updated
1174 * just after the check. However, this is OK since the stream(s) will
1175 * be deleted once the thread is notified that the end point state has
1176 * changed where this function will be called back again.
1177 *
1178 * We track the number of inactive FDs because they still need to be
1179 * closed by the polling thread after a wakeup on the data_pipe or
1180 * metadata_pipe.
1181 */
1182 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1183 (*nb_inactive_fd)++;
1184 continue;
1185 }
1186
1187 (*pollfd)[i].fd = stream->wait_fd;
1188 (*pollfd)[i].events = POLLIN | POLLPRI;
1189 local_stream[i] = stream;
1190 i++;
3bd1e081 1191 }
3bd1e081
MD
1192 }
1193
1194 /*
50f8ae69 1195 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1196 * increment i so nb_fd is the number of real FD.
1197 */
acdb9057 1198 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1199 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1200
1201 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1202 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1203 return i;
1204}
1205
1206/*
84382d49
MD
1207 * Poll on the should_quit pipe and the command socket return -1 on
1208 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1209 */
1210int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1211{
1212 int num_rdy;
1213
88f2b785 1214restart:
3bd1e081
MD
1215 num_rdy = poll(consumer_sockpoll, 2, -1);
1216 if (num_rdy == -1) {
88f2b785
MD
1217 /*
1218 * Restart interrupted system call.
1219 */
1220 if (errno == EINTR) {
1221 goto restart;
1222 }
7a57cf92 1223 PERROR("Poll error");
84382d49 1224 return -1;
3bd1e081 1225 }
509bb1cf 1226 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1227 DBG("consumer_should_quit wake up");
84382d49 1228 return 1;
3bd1e081
MD
1229 }
1230 return 0;
3bd1e081
MD
1231}
1232
1233/*
1234 * Set the error socket.
1235 */
28ab034a 1236void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
3bd1e081
MD
1237{
1238 ctx->consumer_error_socket = sock;
1239}
1240
1241/*
1242 * Set the command socket path.
1243 */
28ab034a 1244void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
3bd1e081
MD
1245{
1246 ctx->consumer_command_sock_path = sock;
1247}
1248
1249/*
1250 * Send return code to the session daemon.
1251 * If the socket is not defined, we return 0, it is not a fatal error
1252 */
ffe60014 1253int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1254{
1255 if (ctx->consumer_error_socket > 0) {
28ab034a
JG
1256 return lttcomm_send_unix_sock(
1257 ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
3bd1e081
MD
1258 }
1259
1260 return 0;
1261}
1262
1263/*
228b5bf7
DG
1264 * Close all the tracefiles and stream fds and MUST be called when all
1265 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081 1266 */
cd9adb8b 1267void lttng_consumer_cleanup()
3bd1e081 1268{
e4421fec 1269 struct lttng_ht_iter iter;
ffe60014 1270 struct lttng_consumer_channel *channel;
e10aec8f 1271 unsigned int trace_chunks_left;
6065ceec 1272
56047f5a
JG
1273 {
1274 lttng::urcu::read_lock_guard read_lock;
3bd1e081 1275
56047f5a
JG
1276 cds_lfht_for_each_entry (
1277 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1278 consumer_del_channel(channel);
1279 }
3bd1e081 1280 }
6065ceec 1281
fa29bfbf
SM
1282 lttng_ht_destroy(the_consumer_data.channel_ht);
1283 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1284
1285 cleanup_relayd_ht();
1286
fa29bfbf 1287 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
d8ef542d 1288
228b5bf7
DG
1289 /*
1290 * This HT contains streams that are freed by either the metadata thread or
1291 * the data thread so we do *nothing* on the hash table and simply destroy
1292 * it.
1293 */
fa29bfbf 1294 lttng_ht_destroy(the_consumer_data.stream_list_ht);
28cc88f3 1295
e10aec8f
MD
1296 /*
1297 * Trace chunks in the registry may still exist if the session
1298 * daemon has encountered an internal error and could not
1299 * tear down its sessions and/or trace chunks properly.
1300 *
1301 * Release the session daemon's implicit reference to any remaining
1302 * trace chunk and print an error if any trace chunk was found. Note
1303 * that there are _no_ legitimate cases for trace chunks to be left,
1304 * it is a leak. However, it can happen following a crash of the
1305 * session daemon and not emptying the registry would cause an assertion
1306 * to hit.
1307 */
28ab034a
JG
1308 trace_chunks_left =
1309 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
e10aec8f
MD
1310 if (trace_chunks_left) {
1311 ERR("%u trace chunks are leaked by lttng-consumerd. "
28ab034a
JG
1312 "This can be caused by an internal error of the session daemon.",
1313 trace_chunks_left);
e10aec8f
MD
1314 }
1315 /* Run all callbacks freeing each chunk. */
1316 rcu_barrier();
fa29bfbf 1317 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
3bd1e081
MD
1318}
1319
1320/*
1321 * Called from signal handler.
1322 */
1323void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1324{
6cd525e8
MD
1325 ssize_t ret;
1326
10211f5c 1327 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1328 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1329 if (ret < 1) {
7a57cf92 1330 PERROR("write consumer quit");
3bd1e081 1331 }
ab1027f4
DG
1332
1333 DBG("Consumer flag that it should quit");
3bd1e081
MD
1334}
1335
5199ffc4
JG
1336/*
1337 * Flush pending writes to trace output disk file.
1338 */
28ab034a 1339static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
3bd1e081
MD
1340{
1341 int outfd = stream->out_fd;
1342
1343 /*
1344 * This does a blocking write-and-wait on any page that belongs to the
1345 * subbuffer prior to the one we just wrote.
1346 * Don't care about error values, as these are just hints and ways to
1347 * limit the amount of page cache used.
1348 */
ffe60014 1349 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1350 return;
1351 }
671e39d7
MJ
1352 lttng::io::hint_flush_range_dont_need_sync(
1353 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
3bd1e081
MD
1354}
1355
1356/*
1357 * Initialise the necessary environnement :
1358 * - create a new context
1359 * - create the poll_pipe
1360 * - create the should_quit pipe (for signal handler)
1361 * - create the thread pipe (for splice)
1362 *
1363 * Takes a function pointer as argument, this function is called when data is
1364 * available on a buffer. This function is responsible to do the
1365 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1366 * buffer configuration and then kernctl_put_next_subbuf at the end.
1367 *
1368 * Returns a pointer to the new context or NULL on error.
1369 */
28ab034a
JG
1370struct lttng_consumer_local_data *
1371lttng_consumer_create(enum lttng_consumer_type type,
1372 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1373 struct lttng_consumer_local_data *ctx,
1374 bool locked_by_caller),
1375 int (*recv_channel)(struct lttng_consumer_channel *channel),
1376 int (*recv_stream)(struct lttng_consumer_stream *stream),
1377 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1378{
d8ef542d 1379 int ret;
3bd1e081
MD
1380 struct lttng_consumer_local_data *ctx;
1381
a0377dfe 1382 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
28ab034a 1383 the_consumer_data.type == type);
fa29bfbf 1384 the_consumer_data.type = type;
3bd1e081 1385
64803277 1386 ctx = zmalloc<lttng_consumer_local_data>();
cd9adb8b 1387 if (ctx == nullptr) {
7a57cf92 1388 PERROR("allocating context");
3bd1e081
MD
1389 goto error;
1390 }
1391
1392 ctx->consumer_error_socket = -1;
331744e3 1393 ctx->consumer_metadata_socket = -1;
cd9adb8b 1394 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
3bd1e081
MD
1395 /* assign the callbacks */
1396 ctx->on_buffer_ready = buffer_ready;
1397 ctx->on_recv_channel = recv_channel;
1398 ctx->on_recv_stream = recv_stream;
1399 ctx->on_update_stream = update_stream;
1400
acdb9057
DG
1401 ctx->consumer_data_pipe = lttng_pipe_open(0);
1402 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1403 goto error_poll_pipe;
1404 }
1405
02b3d176
DG
1406 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1407 if (!ctx->consumer_wakeup_pipe) {
1408 goto error_wakeup_pipe;
1409 }
1410
3bd1e081
MD
1411 ret = pipe(ctx->consumer_should_quit);
1412 if (ret < 0) {
7a57cf92 1413 PERROR("Error creating recv pipe");
3bd1e081
MD
1414 goto error_quit_pipe;
1415 }
1416
d8ef542d
MD
1417 ret = pipe(ctx->consumer_channel_pipe);
1418 if (ret < 0) {
1419 PERROR("Error creating channel pipe");
1420 goto error_channel_pipe;
1421 }
1422
13886d2d
DG
1423 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1424 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1425 goto error_metadata_pipe;
1426 }
3bd1e081 1427
e9404c27
JG
1428 ctx->channel_monitor_pipe = -1;
1429
fb3a43a9 1430 return ctx;
3bd1e081 1431
fb3a43a9 1432error_metadata_pipe:
d8ef542d
MD
1433 utils_close_pipe(ctx->consumer_channel_pipe);
1434error_channel_pipe:
d8ef542d 1435 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1436error_quit_pipe:
02b3d176
DG
1437 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1438error_wakeup_pipe:
acdb9057 1439 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1440error_poll_pipe:
1441 free(ctx);
1442error:
cd9adb8b 1443 return nullptr;
3bd1e081
MD
1444}
1445
282dadbc
MD
1446/*
1447 * Iterate over all streams of the hashtable and free them properly.
1448 */
1449static void destroy_data_stream_ht(struct lttng_ht *ht)
1450{
1451 struct lttng_ht_iter iter;
1452 struct lttng_consumer_stream *stream;
1453
cd9adb8b 1454 if (ht == nullptr) {
282dadbc
MD
1455 return;
1456 }
1457
56047f5a
JG
1458 {
1459 lttng::urcu::read_lock_guard read_lock;
1460 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1461 /*
1462 * Ignore return value since we are currently cleaning up so any error
1463 * can't be handled.
1464 */
1465 (void) consumer_del_stream(stream, ht);
1466 }
282dadbc 1467 }
282dadbc
MD
1468
1469 lttng_ht_destroy(ht);
1470}
1471
1472/*
1473 * Iterate over all streams of the metadata hashtable and free them
1474 * properly.
1475 */
1476static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1477{
1478 struct lttng_ht_iter iter;
1479 struct lttng_consumer_stream *stream;
1480
cd9adb8b 1481 if (ht == nullptr) {
282dadbc
MD
1482 return;
1483 }
1484
56047f5a
JG
1485 {
1486 lttng::urcu::read_lock_guard read_lock;
1487 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1488 /*
1489 * Ignore return value since we are currently cleaning up so any error
1490 * can't be handled.
1491 */
1492 (void) consumer_del_metadata_stream(stream, ht);
1493 }
282dadbc 1494 }
282dadbc
MD
1495
1496 lttng_ht_destroy(ht);
1497}
1498
3bd1e081
MD
1499/*
1500 * Close all fds associated with the instance and free the context.
1501 */
1502void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1503{
4c462e79
MD
1504 int ret;
1505
ab1027f4
DG
1506 DBG("Consumer destroying it. Closing everything.");
1507
4f2e75b9
DG
1508 if (!ctx) {
1509 return;
1510 }
1511
282dadbc
MD
1512 destroy_data_stream_ht(data_ht);
1513 destroy_metadata_stream_ht(metadata_ht);
1514
4c462e79
MD
1515 ret = close(ctx->consumer_error_socket);
1516 if (ret) {
1517 PERROR("close");
1518 }
331744e3
JD
1519 ret = close(ctx->consumer_metadata_socket);
1520 if (ret) {
1521 PERROR("close");
1522 }
d8ef542d 1523 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1524 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1525 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1526 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1527 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1528
3bd1e081
MD
1529 unlink(ctx->consumer_command_sock_path);
1530 free(ctx);
1531}
1532
6197aea7
DG
1533/*
1534 * Write the metadata stream id on the specified file descriptor.
1535 */
28ab034a
JG
1536static int
1537write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
6197aea7 1538{
6cd525e8 1539 ssize_t ret;
1d4dfdef 1540 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1541
1d4dfdef
DG
1542 hdr.stream_id = htobe64(stream->relayd_stream_id);
1543 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1544 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1545 if (ret < sizeof(hdr)) {
d7b75ec8 1546 /*
6f04ed72 1547 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1548 * not to clubber the error output since this can happen in a normal
1549 * code path.
1550 */
1551 if (errno != EPIPE) {
1552 PERROR("write metadata stream id");
1553 }
1554 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1555 /*
1556 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1557 * handle writting the missing part so report that as an error and
1558 * don't lie to the caller.
1559 */
1560 ret = -1;
6197aea7
DG
1561 goto end;
1562 }
1d4dfdef 1563 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
28ab034a
JG
1564 stream->relayd_stream_id,
1565 padding);
6197aea7
DG
1566
1567end:
6cd525e8 1568 return (int) ret;
6197aea7
DG
1569}
1570
3bd1e081 1571/*
09e26845
DG
1572 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1573 * core function for writing trace buffers to either the local filesystem or
1574 * the network.
1575 *
d2956687 1576 * It must be called with the stream and the channel lock held.
79d4ffb7 1577 *
09e26845 1578 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1579 *
1580 * Returns the number of bytes written
1581 */
28ab034a
JG
1582ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1583 const struct lttng_buffer_view *buffer,
1584 unsigned long padding)
3bd1e081 1585{
994ab360 1586 ssize_t ret = 0;
f02e1e8a
DG
1587 off_t orig_offset = stream->out_fd_offset;
1588 /* Default is on the disk */
1589 int outfd = stream->out_fd;
cd9adb8b 1590 struct consumer_relayd_sock_pair *relayd = nullptr;
8994307f 1591 unsigned int relayd_hang_up = 0;
fd424d99
JG
1592 const size_t subbuf_content_size = buffer->size - padding;
1593 size_t write_len;
f02e1e8a
DG
1594
1595 /* RCU lock for the relayd pointer */
56047f5a 1596 lttng::urcu::read_lock_guard read_lock;
28ab034a 1597 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
d2956687 1598
f02e1e8a 1599 /* Flag that the current stream if set for network streaming. */
da009f2c 1600 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1601 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1602 if (relayd == nullptr) {
56591bac 1603 ret = -EPIPE;
f02e1e8a
DG
1604 goto end;
1605 }
1606 }
1607
f02e1e8a
DG
1608 /* Handle stream on the relayd if the output is on the network */
1609 if (relayd) {
fd424d99 1610 unsigned long netlen = subbuf_content_size;
f02e1e8a
DG
1611
1612 /*
1613 * Lock the control socket for the complete duration of the function
1614 * since from this point on we will use the socket.
1615 */
1616 if (stream->metadata_flag) {
1617 /* Metadata requires the control socket. */
1618 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1619 if (stream->reset_metadata_flag) {
1620 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1621 stream->relayd_stream_id,
1622 stream->metadata_version);
93ec662e
JD
1623 if (ret < 0) {
1624 relayd_hang_up = 1;
1625 goto write_error;
1626 }
1627 stream->reset_metadata_flag = 0;
1628 }
1d4dfdef 1629 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1630 }
1631
1d4dfdef 1632 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1633 if (ret < 0) {
1634 relayd_hang_up = 1;
1635 goto write_error;
1636 }
1637 /* Use the returned socket. */
1638 outfd = ret;
f02e1e8a 1639
994ab360
DG
1640 /* Write metadata stream id before payload */
1641 if (stream->metadata_flag) {
239f61af 1642 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1643 if (ret < 0) {
8994307f
DG
1644 relayd_hang_up = 1;
1645 goto write_error;
1646 }
f02e1e8a 1647 }
1624d5b7 1648
fd424d99
JG
1649 write_len = subbuf_content_size;
1650 } else {
1651 /* No streaming; we have to write the full padding. */
93ec662e
JD
1652 if (stream->metadata_flag && stream->reset_metadata_flag) {
1653 ret = utils_truncate_stream_file(stream->out_fd, 0);
1654 if (ret < 0) {
1655 ERR("Reset metadata file");
1656 goto end;
1657 }
1658 stream->reset_metadata_flag = 0;
1659 }
1660
1624d5b7
JD
1661 /*
1662 * Check if we need to change the tracefile before writing the packet.
1663 */
1664 if (stream->chan->tracefile_size > 0 &&
28ab034a
JG
1665 (stream->tracefile_size_current + buffer->size) >
1666 stream->chan->tracefile_size) {
d2956687
JG
1667 ret = consumer_stream_rotate_output_files(stream);
1668 if (ret) {
1624d5b7
JD
1669 goto end;
1670 }
309167d2 1671 outfd = stream->out_fd;
a1ae300f 1672 orig_offset = 0;
1624d5b7 1673 }
fd424d99 1674 stream->tracefile_size_current += buffer->size;
fd424d99 1675 write_len = buffer->size;
f02e1e8a
DG
1676 }
1677
d02b8372
DG
1678 /*
1679 * This call guarantee that len or less is returned. It's impossible to
1680 * receive a ret value that is bigger than len.
1681 */
fd424d99 1682 ret = lttng_write(outfd, buffer->data, write_len);
e2d1190b 1683 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
fd424d99 1684 if (ret < 0 || ((size_t) ret != write_len)) {
d02b8372
DG
1685 /*
1686 * Report error to caller if nothing was written else at least send the
1687 * amount written.
1688 */
1689 if (ret < 0) {
994ab360 1690 ret = -errno;
f02e1e8a 1691 }
994ab360 1692 relayd_hang_up = 1;
f02e1e8a 1693
d02b8372 1694 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1695 if (errno == EPIPE) {
d02b8372
DG
1696 /*
1697 * This is possible if the fd is closed on the other side
1698 * (outfd) or any write problem. It can be verbose a bit for a
1699 * normal execution if for instance the relayd is stopped
1700 * abruptly. This can happen so set this to a DBG statement.
1701 */
1702 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1703 } else {
1704 /* Unhandled error, print it and stop function right now. */
28ab034a 1705 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
f02e1e8a 1706 }
994ab360 1707 goto write_error;
d02b8372
DG
1708 }
1709 stream->output_written += ret;
d02b8372
DG
1710
1711 /* This call is useless on a socket so better save a syscall. */
1712 if (!relayd) {
1713 /* This won't block, but will start writeout asynchronously */
671e39d7 1714 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
fd424d99 1715 stream->out_fd_offset += write_len;
f5dbe415 1716 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1717 }
f02e1e8a 1718
8994307f
DG
1719write_error:
1720 /*
1721 * This is a special case that the relayd has closed its socket. Let's
1722 * cleanup the relayd object and all associated streams.
1723 */
1724 if (relayd && relayd_hang_up) {
28ab034a 1725 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1726 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1727 }
1728
f02e1e8a
DG
1729end:
1730 /* Unlock only if ctrl socket used */
1731 if (relayd && stream->metadata_flag) {
1732 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1733 }
1734
994ab360 1735 return ret;
3bd1e081
MD
1736}
1737
1738/*
1739 * Splice the data from the ring buffer to the tracefile.
1740 *
79d4ffb7
DG
1741 * It must be called with the stream lock held.
1742 *
3bd1e081
MD
1743 * Returns the number of bytes spliced.
1744 */
28ab034a
JG
1745ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1746 struct lttng_consumer_stream *stream,
1747 unsigned long len,
1748 unsigned long padding)
3bd1e081 1749{
f02e1e8a
DG
1750 ssize_t ret = 0, written = 0, ret_splice = 0;
1751 loff_t offset = 0;
1752 off_t orig_offset = stream->out_fd_offset;
1753 int fd = stream->wait_fd;
1754 /* Default is on the disk */
1755 int outfd = stream->out_fd;
cd9adb8b 1756 struct consumer_relayd_sock_pair *relayd = nullptr;
fb3a43a9 1757 int *splice_pipe;
8994307f 1758 unsigned int relayd_hang_up = 0;
f02e1e8a 1759
fa29bfbf 1760 switch (the_consumer_data.type) {
3bd1e081 1761 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1762 break;
7753dea8
MD
1763 case LTTNG_CONSUMER32_UST:
1764 case LTTNG_CONSUMER64_UST:
f02e1e8a 1765 /* Not supported for user space tracing */
3bd1e081
MD
1766 return -ENOSYS;
1767 default:
1768 ERR("Unknown consumer_data type");
a0377dfe 1769 abort();
3bd1e081
MD
1770 }
1771
f02e1e8a 1772 /* RCU lock for the relayd pointer */
56047f5a 1773 lttng::urcu::read_lock_guard read_lock;
f02e1e8a
DG
1774
1775 /* Flag that the current stream if set for network streaming. */
da009f2c 1776 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a 1777 relayd = consumer_find_relayd(stream->net_seq_idx);
cd9adb8b 1778 if (relayd == nullptr) {
ad0b0d23 1779 written = -ret;
f02e1e8a
DG
1780 goto end;
1781 }
1782 }
a2361a61 1783 splice_pipe = stream->splice_pipe;
fb3a43a9 1784
f02e1e8a 1785 /* Write metadata stream id before payload */
1d4dfdef 1786 if (relayd) {
ad0b0d23 1787 unsigned long total_len = len;
f02e1e8a 1788
1d4dfdef
DG
1789 if (stream->metadata_flag) {
1790 /*
1791 * Lock the control socket for the complete duration of the function
1792 * since from this point on we will use the socket.
1793 */
1794 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1795
93ec662e
JD
1796 if (stream->reset_metadata_flag) {
1797 ret = relayd_reset_metadata(&relayd->control_sock,
28ab034a
JG
1798 stream->relayd_stream_id,
1799 stream->metadata_version);
93ec662e
JD
1800 if (ret < 0) {
1801 relayd_hang_up = 1;
1802 goto write_error;
1803 }
1804 stream->reset_metadata_flag = 0;
1805 }
28ab034a 1806 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1d4dfdef
DG
1807 if (ret < 0) {
1808 written = ret;
ad0b0d23
DG
1809 relayd_hang_up = 1;
1810 goto write_error;
1d4dfdef
DG
1811 }
1812
1813 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1814 }
1815
1816 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1817 if (ret < 0) {
1818 written = ret;
1819 relayd_hang_up = 1;
1820 goto write_error;
f02e1e8a 1821 }
ad0b0d23
DG
1822 /* Use the returned socket. */
1823 outfd = ret;
1d4dfdef
DG
1824 } else {
1825 /* No streaming, we have to set the len with the full padding */
1826 len += padding;
1624d5b7 1827
93ec662e
JD
1828 if (stream->metadata_flag && stream->reset_metadata_flag) {
1829 ret = utils_truncate_stream_file(stream->out_fd, 0);
1830 if (ret < 0) {
1831 ERR("Reset metadata file");
1832 goto end;
1833 }
1834 stream->reset_metadata_flag = 0;
1835 }
1624d5b7
JD
1836 /*
1837 * Check if we need to change the tracefile before writing the packet.
1838 */
1839 if (stream->chan->tracefile_size > 0 &&
28ab034a 1840 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
d2956687 1841 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1842 if (ret < 0) {
ad0b0d23 1843 written = ret;
1624d5b7
JD
1844 goto end;
1845 }
309167d2 1846 outfd = stream->out_fd;
a1ae300f 1847 orig_offset = 0;
1624d5b7
JD
1848 }
1849 stream->tracefile_size_current += len;
f02e1e8a
DG
1850 }
1851
1852 while (len > 0) {
1d4dfdef 1853 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
28ab034a
JG
1854 (unsigned long) offset,
1855 len,
1856 fd,
1857 splice_pipe[1]);
1858 ret_splice = splice(
cd9adb8b 1859 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
f02e1e8a
DG
1860 DBG("splice chan to pipe, ret %zd", ret_splice);
1861 if (ret_splice < 0) {
d02b8372 1862 ret = errno;
ad0b0d23 1863 written = -ret;
d02b8372 1864 PERROR("Error in relay splice");
f02e1e8a
DG
1865 goto splice_error;
1866 }
1867
1868 /* Handle stream on the relayd if the output is on the network */
ad0b0d23
DG
1869 if (relayd && stream->metadata_flag) {
1870 size_t metadata_payload_size =
1871 sizeof(struct lttcomm_relayd_metadata_payload);
1872
1873 /* Update counter to fit the spliced data */
1874 ret_splice += metadata_payload_size;
1875 len += metadata_payload_size;
1876 /*
1877 * We do this so the return value can match the len passed as
1878 * argument to this function.
1879 */
1880 written -= metadata_payload_size;
f02e1e8a
DG
1881 }
1882
1883 /* Splice data out */
28ab034a 1884 ret_splice = splice(splice_pipe[0],
cd9adb8b 1885 nullptr,
28ab034a 1886 outfd,
cd9adb8b 1887 nullptr,
28ab034a
JG
1888 ret_splice,
1889 SPLICE_F_MOVE | SPLICE_F_MORE);
1890 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
f02e1e8a 1891 if (ret_splice < 0) {
d02b8372 1892 ret = errno;
ad0b0d23
DG
1893 written = -ret;
1894 relayd_hang_up = 1;
1895 goto write_error;
f02e1e8a 1896 } else if (ret_splice > len) {
d02b8372
DG
1897 /*
1898 * We don't expect this code path to be executed but you never know
1899 * so this is an extra protection agains a buggy splice().
1900 */
f02e1e8a 1901 ret = errno;
ad0b0d23 1902 written += ret_splice;
28ab034a 1903 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
f02e1e8a 1904 goto splice_error;
d02b8372
DG
1905 } else {
1906 /* All good, update current len and continue. */
1907 len -= ret_splice;
f02e1e8a 1908 }
f02e1e8a
DG
1909
1910 /* This call is useless on a socket so better save a syscall. */
1911 if (!relayd) {
1912 /* This won't block, but will start writeout asynchronously */
671e39d7 1913 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
f02e1e8a
DG
1914 stream->out_fd_offset += ret_splice;
1915 }
e5d1a9b3 1916 stream->output_written += ret_splice;
f02e1e8a
DG
1917 written += ret_splice;
1918 }
f5dbe415
JG
1919 if (!relayd) {
1920 lttng_consumer_sync_trace_file(stream, orig_offset);
1921 }
f02e1e8a
DG
1922 goto end;
1923
8994307f
DG
1924write_error:
1925 /*
1926 * This is a special case that the relayd has closed its socket. Let's
1927 * cleanup the relayd object and all associated streams.
1928 */
1929 if (relayd && relayd_hang_up) {
28ab034a 1930 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
9276e5c8 1931 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1932 /* Skip splice error so the consumer does not fail */
1933 goto end;
1934 }
1935
f02e1e8a
DG
1936splice_error:
1937 /* send the appropriate error description to sessiond */
1938 switch (ret) {
f02e1e8a 1939 case EINVAL:
f73fabfd 1940 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1941 break;
1942 case ENOMEM:
f73fabfd 1943 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1944 break;
1945 case ESPIPE:
f73fabfd 1946 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1947 break;
1948 }
1949
1950end:
1951 if (relayd && stream->metadata_flag) {
1952 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1953 }
1954
f02e1e8a 1955 return written;
3bd1e081
MD
1956}
1957
15055ce5
JD
1958/*
1959 * Sample the snapshot positions for a specific fd
1960 *
1961 * Returns 0 on success, < 0 on error
1962 */
1963int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1964{
fa29bfbf 1965 switch (the_consumer_data.type) {
15055ce5
JD
1966 case LTTNG_CONSUMER_KERNEL:
1967 return lttng_kconsumer_sample_snapshot_positions(stream);
1968 case LTTNG_CONSUMER32_UST:
1969 case LTTNG_CONSUMER64_UST:
1970 return lttng_ustconsumer_sample_snapshot_positions(stream);
1971 default:
1972 ERR("Unknown consumer_data type");
a0377dfe 1973 abort();
15055ce5
JD
1974 return -ENOSYS;
1975 }
1976}
3bd1e081
MD
1977/*
1978 * Take a snapshot for a specific fd
1979 *
1980 * Returns 0 on success, < 0 on error
1981 */
ffe60014 1982int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1983{
fa29bfbf 1984 switch (the_consumer_data.type) {
3bd1e081 1985 case LTTNG_CONSUMER_KERNEL:
ffe60014 1986 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1987 case LTTNG_CONSUMER32_UST:
1988 case LTTNG_CONSUMER64_UST:
ffe60014 1989 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1990 default:
1991 ERR("Unknown consumer_data type");
a0377dfe 1992 abort();
3bd1e081
MD
1993 return -ENOSYS;
1994 }
3bd1e081
MD
1995}
1996
1997/*
1998 * Get the produced position
1999 *
2000 * Returns 0 on success, < 0 on error
2001 */
28ab034a 2002int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 2003{
fa29bfbf 2004 switch (the_consumer_data.type) {
3bd1e081 2005 case LTTNG_CONSUMER_KERNEL:
ffe60014 2006 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
2007 case LTTNG_CONSUMER32_UST:
2008 case LTTNG_CONSUMER64_UST:
ffe60014 2009 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2010 default:
2011 ERR("Unknown consumer_data type");
a0377dfe 2012 abort();
3bd1e081
MD
2013 return -ENOSYS;
2014 }
2015}
2016
15055ce5
JD
2017/*
2018 * Get the consumed position (free-running counter position in bytes).
2019 *
2020 * Returns 0 on success, < 0 on error
2021 */
28ab034a 2022int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
15055ce5 2023{
fa29bfbf 2024 switch (the_consumer_data.type) {
15055ce5
JD
2025 case LTTNG_CONSUMER_KERNEL:
2026 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2027 case LTTNG_CONSUMER32_UST:
2028 case LTTNG_CONSUMER64_UST:
2029 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2030 default:
2031 ERR("Unknown consumer_data type");
a0377dfe 2032 abort();
15055ce5
JD
2033 return -ENOSYS;
2034 }
2035}
2036
3bd1e081 2037int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
2038 int sock,
2039 struct pollfd *consumer_sockpoll)
3bd1e081 2040{
fa29bfbf 2041 switch (the_consumer_data.type) {
3bd1e081
MD
2042 case LTTNG_CONSUMER_KERNEL:
2043 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2044 case LTTNG_CONSUMER32_UST:
2045 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2046 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2047 default:
2048 ERR("Unknown consumer_data type");
a0377dfe 2049 abort();
3bd1e081
MD
2050 return -ENOSYS;
2051 }
2052}
2053
cd9adb8b 2054static void lttng_consumer_close_all_metadata()
d88aee68 2055{
fa29bfbf 2056 switch (the_consumer_data.type) {
d88aee68
DG
2057 case LTTNG_CONSUMER_KERNEL:
2058 /*
2059 * The Kernel consumer has a different metadata scheme so we don't
2060 * close anything because the stream will be closed by the session
2061 * daemon.
2062 */
2063 break;
2064 case LTTNG_CONSUMER32_UST:
2065 case LTTNG_CONSUMER64_UST:
2066 /*
2067 * Close all metadata streams. The metadata hash table is passed and
2068 * this call iterates over it by closing all wakeup fd. This is safe
2069 * because at this point we are sure that the metadata producer is
2070 * either dead or blocked.
2071 */
6d574024 2072 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2073 break;
2074 default:
2075 ERR("Unknown consumer_data type");
a0377dfe 2076 abort();
d88aee68
DG
2077 }
2078}
2079
fb3a43a9
DG
2080/*
2081 * Clean up a metadata stream and free its memory.
2082 */
28ab034a 2083void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
fb3a43a9 2084{
cd9adb8b 2085 struct lttng_consumer_channel *channel = nullptr;
a6ef8ee6 2086 bool free_channel = false;
fb3a43a9 2087
a0377dfe 2088 LTTNG_ASSERT(stream);
fb3a43a9
DG
2089 /*
2090 * This call should NEVER receive regular stream. It must always be
2091 * metadata stream and this is crucial for data structure synchronization.
2092 */
a0377dfe 2093 LTTNG_ASSERT(stream->metadata_flag);
fb3a43a9 2094
e316aad5
DG
2095 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2096
fa29bfbf 2097 pthread_mutex_lock(&the_consumer_data.lock);
a6ef8ee6
JG
2098 /*
2099 * Note that this assumes that a stream's channel is never changed and
2100 * that the stream's lock doesn't need to be taken to sample its
2101 * channel.
2102 */
2103 channel = stream->chan;
2104 pthread_mutex_lock(&channel->lock);
3dad2c0f 2105 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2106 if (channel->metadata_cache) {
081424af 2107 /* Only applicable to userspace consumers. */
a6ef8ee6 2108 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2109 }
8994307f 2110
6d574024
DG
2111 /* Remove any reference to that stream. */
2112 consumer_stream_delete(stream, ht);
ca22feea 2113
6d574024 2114 /* Close down everything including the relayd if one. */
d119bd01 2115 consumer_stream_close_output(stream);
6d574024
DG
2116 /* Destroy tracer buffers of the stream. */
2117 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2118
2119 /* Atomically decrement channel refcount since other threads can use it. */
28ab034a
JG
2120 if (!uatomic_sub_return(&channel->refcount, 1) &&
2121 !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2122 /* Go for channel deletion! */
a6ef8ee6 2123 free_channel = true;
fb3a43a9 2124 }
cd9adb8b 2125 stream->chan = nullptr;
fb3a43a9 2126
73811ecc
DG
2127 /*
2128 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2129 * channel lock MUST be acquired before being able to check for a NULL
2130 * pointer value.
73811ecc 2131 */
cd9adb8b 2132 channel->metadata_stream = nullptr;
73811ecc 2133
a6ef8ee6
JG
2134 if (channel->metadata_cache) {
2135 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2136 }
3dad2c0f 2137 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2138 pthread_mutex_unlock(&channel->lock);
fa29bfbf 2139 pthread_mutex_unlock(&the_consumer_data.lock);
e316aad5 2140
a6ef8ee6
JG
2141 if (free_channel) {
2142 consumer_del_channel(channel);
e316aad5
DG
2143 }
2144
d2956687 2145 lttng_trace_chunk_put(stream->trace_chunk);
cd9adb8b 2146 stream->trace_chunk = nullptr;
6d574024 2147 consumer_stream_free(stream);
fb3a43a9
DG
2148}
2149
2150/*
2151 * Action done with the metadata stream when adding it to the consumer internal
2152 * data structures to handle it.
2153 */
66d583dc 2154void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2155{
5ab66908 2156 struct lttng_ht *ht = metadata_ht;
76082088 2157 struct lttng_ht_iter iter;
d88aee68 2158 struct lttng_ht_node_u64 *node;
fb3a43a9 2159
a0377dfe
FD
2160 LTTNG_ASSERT(stream);
2161 LTTNG_ASSERT(ht);
e316aad5 2162
d88aee68 2163 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5 2164
fa29bfbf 2165 pthread_mutex_lock(&the_consumer_data.lock);
a9838785 2166 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2167 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2168 pthread_mutex_lock(&stream->lock);
e316aad5 2169
e316aad5
DG
2170 /*
2171 * From here, refcounts are updated so be _careful_ when returning an error
2172 * after this point.
2173 */
2174
56047f5a 2175 lttng::urcu::read_lock_guard read_lock;
76082088
DG
2176
2177 /*
2178 * Lookup the stream just to make sure it does not exist in our internal
2179 * state. This should NEVER happen.
2180 */
d88aee68
DG
2181 lttng_ht_lookup(ht, &stream->key, &iter);
2182 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2183 LTTNG_ASSERT(!node);
76082088 2184
e316aad5 2185 /*
ffe60014
DG
2186 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2187 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2188 * causes the count to become 0 also causes a stream to be added. The
2189 * channel deletion will thus be triggered by the following removal of this
2190 * stream.
2191 */
ffe60014 2192 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2193 /* Increment refcount before decrementing nb_init_stream_left */
2194 cmm_smp_wmb();
ffe60014 2195 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2196 }
2197
d88aee68 2198 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2199
28ab034a 2200 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
d8ef542d 2201
ca22feea
DG
2202 /*
2203 * Add stream to the stream_list_ht of the consumer data. No need to steal
2204 * the key since the HT does not use it and we allow to add redundant keys
2205 * into this table.
2206 */
28ab034a 2207 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2208
2e818a6a 2209 pthread_mutex_unlock(&stream->lock);
a9838785 2210 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2211 pthread_mutex_unlock(&stream->chan->timer_lock);
fa29bfbf 2212 pthread_mutex_unlock(&the_consumer_data.lock);
fb3a43a9
DG
2213}
2214
8994307f
DG
2215/*
2216 * Delete data stream that are flagged for deletion (endpoint_status).
2217 */
cd9adb8b 2218static void validate_endpoint_status_data_stream()
8994307f
DG
2219{
2220 struct lttng_ht_iter iter;
2221 struct lttng_consumer_stream *stream;
2222
2223 DBG("Consumer delete flagged data stream");
2224
56047f5a
JG
2225 {
2226 lttng::urcu::read_lock_guard read_lock;
2227
2228 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2229 /* Validate delete flag of the stream */
2230 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2231 continue;
2232 }
2233 /* Delete it right now */
2234 consumer_del_stream(stream, data_ht);
8994307f 2235 }
8994307f 2236 }
8994307f
DG
2237}
2238
2239/*
2240 * Delete metadata stream that are flagged for deletion (endpoint_status).
2241 */
28ab034a 2242static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
8994307f
DG
2243{
2244 struct lttng_ht_iter iter;
2245 struct lttng_consumer_stream *stream;
2246
2247 DBG("Consumer delete flagged metadata stream");
2248
a0377dfe 2249 LTTNG_ASSERT(pollset);
8994307f 2250
56047f5a
JG
2251 {
2252 lttng::urcu::read_lock_guard read_lock;
2253 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2254 /* Validate delete flag of the stream */
2255 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2256 continue;
2257 }
2258 /*
2259 * Remove from pollset so the metadata thread can continue without
2260 * blocking on a deleted stream.
2261 */
2262 lttng_poll_del(pollset, stream->wait_fd);
8994307f 2263
56047f5a
JG
2264 /* Delete it right now */
2265 consumer_del_metadata_stream(stream, metadata_ht);
2266 }
8994307f 2267 }
8994307f
DG
2268}
2269
fb3a43a9
DG
2270/*
2271 * Thread polls on metadata file descriptor and write them on disk or on the
2272 * network.
2273 */
7d980def 2274void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2275{
1fc79fb4 2276 int ret, i, pollfd, err = -1;
fb3a43a9 2277 uint32_t revents, nb_fd;
cd9adb8b 2278 struct lttng_consumer_stream *stream = nullptr;
fb3a43a9 2279 struct lttng_ht_iter iter;
d88aee68 2280 struct lttng_ht_node_u64 *node;
fb3a43a9 2281 struct lttng_poll_event events;
97535efa 2282 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
fb3a43a9
DG
2283 ssize_t len;
2284
2285 rcu_register_thread();
2286
1fc79fb4
MD
2287 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2288
2d57de81
MD
2289 if (testpoint(consumerd_thread_metadata)) {
2290 goto error_testpoint;
2291 }
2292
9ce5646a
MD
2293 health_code_update();
2294
fb3a43a9
DG
2295 DBG("Thread metadata poll started");
2296
fb3a43a9
DG
2297 /* Size is set to 1 for the consumer_metadata pipe */
2298 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2299 if (ret < 0) {
2300 ERR("Poll set creation failed");
d8ef542d 2301 goto end_poll;
fb3a43a9
DG
2302 }
2303
28ab034a 2304 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2305 if (ret < 0) {
2306 goto end;
2307 }
2308
2309 /* Main loop */
2310 DBG("Metadata main loop started");
2311
cd9adb8b 2312 while (true) {
28ab034a 2313 restart:
7fa2082e 2314 health_code_update();
9ce5646a 2315 health_poll_entry();
7fa2082e 2316 DBG("Metadata poll wait");
fb3a43a9 2317 ret = lttng_poll_wait(&events, -1);
28ab034a 2318 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2319 health_poll_exit();
40063ead 2320 DBG("Metadata event caught in thread");
fb3a43a9
DG
2321 if (ret < 0) {
2322 if (errno == EINTR) {
40063ead 2323 ERR("Poll EINTR caught");
fb3a43a9
DG
2324 goto restart;
2325 }
d9607cd7 2326 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2327 err = 0; /* All is OK */
d9607cd7
MD
2328 }
2329 goto end;
fb3a43a9
DG
2330 }
2331
0d9c5d77
DG
2332 nb_fd = ret;
2333
e316aad5 2334 /* From here, the event is a metadata wait fd */
fb3a43a9 2335 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2336 health_code_update();
2337
fb3a43a9
DG
2338 revents = LTTNG_POLL_GETEV(&events, i);
2339 pollfd = LTTNG_POLL_GETFD(&events, i);
2340
13886d2d 2341 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2342 if (revents & LPOLLIN) {
13886d2d
DG
2343 ssize_t pipe_len;
2344
2345 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
28ab034a 2346 &stream,
5c7248cd
JG
2347 sizeof(stream)); /* NOLINT sizeof
2348 used on a
2349 pointer. */
2350 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2351 pointer. */
03e43155
MD
2352 if (pipe_len < 0) {
2353 PERROR("read metadata stream");
2354 }
fb3a43a9 2355 /*
28ab034a
JG
2356 * Remove the pipe from the poll set and continue
2357 * the loop since their might be data to consume.
fb3a43a9 2358 */
28ab034a
JG
2359 lttng_poll_del(
2360 &events,
2361 lttng_pipe_get_readfd(
2362 ctx->consumer_metadata_pipe));
03e43155 2363 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2364 continue;
2365 }
2366
8994307f 2367 /* A NULL stream means that the state has changed. */
cd9adb8b 2368 if (stream == nullptr) {
8994307f
DG
2369 /* Check for deleted streams. */
2370 validate_endpoint_status_metadata_stream(&events);
3714380f 2371 goto restart;
8994307f
DG
2372 }
2373
fb3a43a9 2374 DBG("Adding metadata stream %d to poll set",
28ab034a 2375 stream->wait_fd);
fb3a43a9 2376
fb3a43a9 2377 /* Add metadata stream to the global poll events list */
28ab034a
JG
2378 lttng_poll_add(
2379 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2380 } else if (revents & (LPOLLERR | LPOLLHUP)) {
03e43155
MD
2381 DBG("Metadata thread pipe hung up");
2382 /*
2383 * Remove the pipe from the poll set and continue the loop
2384 * since their might be data to consume.
2385 */
28ab034a
JG
2386 lttng_poll_del(
2387 &events,
2388 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
03e43155
MD
2389 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2390 continue;
2391 } else {
28ab034a
JG
2392 ERR("Unexpected poll events %u for sock %d",
2393 revents,
2394 pollfd);
03e43155 2395 goto end;
fb3a43a9
DG
2396 }
2397
e316aad5 2398 /* Handle other stream */
fb3a43a9
DG
2399 continue;
2400 }
2401
56047f5a 2402 lttng::urcu::read_lock_guard read_lock;
d88aee68
DG
2403 {
2404 uint64_t tmp_id = (uint64_t) pollfd;
2405
2406 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2407 }
2408 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 2409 LTTNG_ASSERT(node);
fb3a43a9 2410
28ab034a 2411 stream = caa_container_of(node, struct lttng_consumer_stream, node);
fb3a43a9 2412
03e43155
MD
2413 if (revents & (LPOLLIN | LPOLLPRI)) {
2414 /* Get the data out of the metadata file descriptor */
2415 DBG("Metadata available on fd %d", pollfd);
a0377dfe 2416 LTTNG_ASSERT(stream->wait_fd == pollfd);
03e43155
MD
2417
2418 do {
2419 health_code_update();
2420
6f9449c2 2421 len = ctx->on_buffer_ready(stream, ctx, false);
03e43155
MD
2422 /*
2423 * We don't check the return value here since if we get
83f4233d 2424 * a negative len, it means an error occurred thus we
03e43155
MD
2425 * simply remove it from the poll set and free the
2426 * stream.
2427 */
2428 } while (len > 0);
2429
2430 /* It's ok to have an unavailable sub-buffer */
2431 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2432 /* Clean up stream from consumer and free it. */
2433 lttng_poll_del(&events, stream->wait_fd);
2434 consumer_del_metadata_stream(stream, metadata_ht);
2435 }
2436 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2437 DBG("Metadata fd %d is hup|err.", pollfd);
fa29bfbf 2438 if (!stream->hangup_flush_done &&
28ab034a
JG
2439 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2440 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
fb3a43a9
DG
2441 DBG("Attempting to flush and consume the UST buffers");
2442 lttng_ustconsumer_on_stream_hangup(stream);
2443
2444 /* We just flushed the stream now read it. */
4bb94b75 2445 do {
9ce5646a
MD
2446 health_code_update();
2447
6f9449c2 2448 len = ctx->on_buffer_ready(stream, ctx, false);
4bb94b75 2449 /*
28ab034a
JG
2450 * We don't check the return value here since if we
2451 * get a negative len, it means an error occurred
2452 * thus we simply remove it from the poll set and
2453 * free the stream.
4bb94b75
DG
2454 */
2455 } while (len > 0);
fb3a43a9
DG
2456 }
2457
fb3a43a9 2458 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2459 /*
2460 * This call update the channel states, closes file descriptors
2461 * and securely free the stream.
2462 */
2463 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2464 } else {
2465 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 2466 goto end;
fb3a43a9 2467 }
e316aad5 2468 /* Release RCU lock for the stream looked up */
fb3a43a9
DG
2469 }
2470 }
2471
1fc79fb4
MD
2472 /* All is OK */
2473 err = 0;
fb3a43a9
DG
2474end:
2475 DBG("Metadata poll thread exiting");
fb3a43a9 2476
d8ef542d
MD
2477 lttng_poll_clean(&events);
2478end_poll:
2d57de81 2479error_testpoint:
1fc79fb4
MD
2480 if (err) {
2481 health_error();
2482 ERR("Health error occurred in %s", __func__);
2483 }
2484 health_unregister(health_consumerd);
fb3a43a9 2485 rcu_unregister_thread();
cd9adb8b 2486 return nullptr;
fb3a43a9
DG
2487}
2488
3bd1e081 2489/*
e4421fec 2490 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2491 * it to tracefile if necessary.
2492 */
7d980def 2493void *consumer_thread_data_poll(void *data)
3bd1e081 2494{
ff930959 2495 int num_rdy, high_prio, ret, i, err = -1;
cd9adb8b 2496 struct pollfd *pollfd = nullptr;
3bd1e081 2497 /* local view of the streams */
cd9adb8b 2498 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
3bd1e081 2499 /* local view of consumer_data.fds_count */
8bdcc002
JG
2500 int nb_fd = 0;
2501 /* 2 for the consumer_data_pipe and wake up pipe */
2502 const int nb_pipes_fd = 2;
9a2fcf78
JD
2503 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2504 int nb_inactive_fd = 0;
97535efa 2505 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
00e2e675 2506 ssize_t len;
3bd1e081 2507
e7b994a3
DG
2508 rcu_register_thread();
2509
1fc79fb4
MD
2510 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2511
2d57de81
MD
2512 if (testpoint(consumerd_thread_data)) {
2513 goto error_testpoint;
2514 }
2515
9ce5646a
MD
2516 health_code_update();
2517
64803277 2518 local_stream = zmalloc<lttng_consumer_stream *>();
cd9adb8b 2519 if (local_stream == nullptr) {
4df6c8cb
MD
2520 PERROR("local_stream malloc");
2521 goto end;
2522 }
3bd1e081 2523
cd9adb8b 2524 while (true) {
9ce5646a
MD
2525 health_code_update();
2526
3bd1e081 2527 high_prio = 0;
3bd1e081
MD
2528
2529 /*
e4421fec 2530 * the fds set has been updated, we need to update our
3bd1e081
MD
2531 * local array as well
2532 */
fa29bfbf
SM
2533 pthread_mutex_lock(&the_consumer_data.lock);
2534 if (the_consumer_data.need_update) {
0e428499 2535 free(pollfd);
cd9adb8b 2536 pollfd = nullptr;
0e428499
DG
2537
2538 free(local_stream);
cd9adb8b 2539 local_stream = nullptr;
3bd1e081 2540
8bdcc002 2541 /* Allocate for all fds */
28ab034a
JG
2542 pollfd =
2543 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2544 if (pollfd == nullptr) {
7a57cf92 2545 PERROR("pollfd malloc");
fa29bfbf 2546 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2547 goto end;
2548 }
2549
28ab034a
JG
2550 local_stream = calloc<lttng_consumer_stream *>(
2551 the_consumer_data.stream_count + nb_pipes_fd);
cd9adb8b 2552 if (local_stream == nullptr) {
7a57cf92 2553 PERROR("local_stream malloc");
fa29bfbf 2554 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2555 goto end;
2556 }
28ab034a
JG
2557 ret = update_poll_array(
2558 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
3bd1e081
MD
2559 if (ret < 0) {
2560 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2561 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
fa29bfbf 2562 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081
MD
2563 goto end;
2564 }
2565 nb_fd = ret;
fa29bfbf 2566 the_consumer_data.need_update = 0;
3bd1e081 2567 }
fa29bfbf 2568 pthread_mutex_unlock(&the_consumer_data.lock);
3bd1e081 2569
4078b776 2570 /* No FDs and consumer_quit, consumer_cleanup the thread */
28ab034a
JG
2571 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2572 err = 0; /* All is OK */
4078b776
MD
2573 goto end;
2574 }
3bd1e081 2575 /* poll on the array of fds */
88f2b785 2576 restart:
261de637 2577 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2578 if (testpoint(consumerd_thread_data_poll)) {
2579 goto end;
2580 }
9ce5646a 2581 health_poll_entry();
261de637 2582 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2583 health_poll_exit();
3bd1e081
MD
2584 DBG("poll num_rdy : %d", num_rdy);
2585 if (num_rdy == -1) {
88f2b785
MD
2586 /*
2587 * Restart interrupted system call.
2588 */
2589 if (errno == EINTR) {
2590 goto restart;
2591 }
7a57cf92 2592 PERROR("Poll error");
f73fabfd 2593 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2594 goto end;
2595 } else if (num_rdy == 0) {
2596 DBG("Polling thread timed out");
2597 goto end;
2598 }
2599
80957876
JG
2600 if (caa_unlikely(data_consumption_paused)) {
2601 DBG("Data consumption paused, sleeping...");
2602 sleep(1);
2603 goto restart;
2604 }
2605
3bd1e081 2606 /*
50f8ae69 2607 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2608 * beginning of the loop to update the array. We want to prioritize
2609 * array update over low-priority reads.
3bd1e081 2610 */
509bb1cf 2611 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2612 ssize_t pipe_readlen;
04fdd819 2613
50f8ae69 2614 DBG("consumer_data_pipe wake up");
5c7248cd
JG
2615 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2616 &new_stream,
2617 sizeof(new_stream)); /* NOLINT sizeof used on
2618 a pointer. */
2619 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2620 */
6cd525e8 2621 PERROR("Consumer data pipe");
23f5f35d
DG
2622 /* Continue so we can at least handle the current stream(s). */
2623 continue;
2624 }
c869f647
DG
2625
2626 /*
2627 * If the stream is NULL, just ignore it. It's also possible that
2628 * the sessiond poll thread changed the consumer_quit state and is
2629 * waking us up to test it.
2630 */
cd9adb8b 2631 if (new_stream == nullptr) {
8994307f 2632 validate_endpoint_status_data_stream();
c869f647
DG
2633 continue;
2634 }
2635
c869f647 2636 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2637 continue;
2638 }
2639
02b3d176
DG
2640 /* Handle wakeup pipe. */
2641 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2642 char dummy;
2643 ssize_t pipe_readlen;
2644
28ab034a
JG
2645 pipe_readlen =
2646 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
02b3d176
DG
2647 if (pipe_readlen < 0) {
2648 PERROR("Consumer data wakeup pipe");
2649 }
2650 /* We've been awakened to handle stream(s). */
2651 ctx->has_wakeup = 0;
2652 }
2653
3bd1e081
MD
2654 /* Take care of high priority channels first. */
2655 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2656 health_code_update();
2657
cd9adb8b 2658 if (local_stream[i] == nullptr) {
9617607b
DG
2659 continue;
2660 }
fb3a43a9 2661 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2662 DBG("Urgent read on fd %d", pollfd[i].fd);
2663 high_prio = 1;
6f9449c2 2664 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
d41f73b7 2665 /* it's ok to have an unavailable sub-buffer */
b64403e3 2666 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2667 /* Clean the stream and free it. */
2668 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2669 local_stream[i] = nullptr;
4078b776 2670 } else if (len > 0) {
28ab034a
JG
2671 local_stream[i]->has_data_left_to_be_read_before_teardown =
2672 1;
d41f73b7 2673 }
3bd1e081
MD
2674 }
2675 }
2676
4078b776
MD
2677 /*
2678 * If we read high prio channel in this loop, try again
2679 * for more high prio data.
2680 */
2681 if (high_prio) {
3bd1e081
MD
2682 continue;
2683 }
2684
2685 /* Take care of low priority channels. */
4078b776 2686 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2687 health_code_update();
2688
cd9adb8b 2689 if (local_stream[i] == nullptr) {
9617607b
DG
2690 continue;
2691 }
28ab034a
JG
2692 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2693 local_stream[i]->has_data) {
4078b776 2694 DBG("Normal read on fd %d", pollfd[i].fd);
6f9449c2 2695 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
4078b776 2696 /* it's ok to have an unavailable sub-buffer */
b64403e3 2697 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2698 /* Clean the stream and free it. */
2699 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2700 local_stream[i] = nullptr;
4078b776 2701 } else if (len > 0) {
28ab034a
JG
2702 local_stream[i]->has_data_left_to_be_read_before_teardown =
2703 1;
4078b776
MD
2704 }
2705 }
2706 }
2707
2708 /* Handle hangup and errors */
2709 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2710 health_code_update();
2711
cd9adb8b 2712 if (local_stream[i] == nullptr) {
9617607b
DG
2713 continue;
2714 }
28ab034a
JG
2715 if (!local_stream[i]->hangup_flush_done &&
2716 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2717 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2718 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
4078b776 2719 DBG("fd %d is hup|err|nval. Attempting flush and read.",
28ab034a 2720 pollfd[i].fd);
4078b776
MD
2721 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2722 /* Attempt read again, for the data we just flushed. */
c715ddc9 2723 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
4078b776
MD
2724 }
2725 /*
c715ddc9
JG
2726 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2727 * performed. This type of flush ensures that a new packet is produced no
2728 * matter the consumed/produced positions are.
2729 *
2730 * This, in turn, causes the next pass to see that data available for the
2731 * stream. When we come back here, we can be assured that all available
2732 * data has been consumed and we can finally destroy the stream.
2733 *
4078b776
MD
2734 * If the poll flag is HUP/ERR/NVAL and we have
2735 * read no data in this pass, we can remove the
2736 * stream from its hash table.
2737 */
2738 if ((pollfd[i].revents & POLLHUP)) {
2739 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
c715ddc9 2740 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2741 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2742 local_stream[i] = nullptr;
4078b776
MD
2743 }
2744 } else if (pollfd[i].revents & POLLERR) {
2745 ERR("Error returned in polling fd %d.", pollfd[i].fd);
c715ddc9 2746 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2747 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2748 local_stream[i] = nullptr;
4078b776
MD
2749 }
2750 } else if (pollfd[i].revents & POLLNVAL) {
2751 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
c715ddc9 2752 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
43c34bc3 2753 consumer_del_stream(local_stream[i], data_ht);
cd9adb8b 2754 local_stream[i] = nullptr;
3bd1e081
MD
2755 }
2756 }
cd9adb8b 2757 if (local_stream[i] != nullptr) {
c715ddc9 2758 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
9617607b 2759 }
3bd1e081
MD
2760 }
2761 }
1fc79fb4
MD
2762 /* All is OK */
2763 err = 0;
3bd1e081
MD
2764end:
2765 DBG("polling thread exiting");
0e428499
DG
2766 free(pollfd);
2767 free(local_stream);
fb3a43a9
DG
2768
2769 /*
2770 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2771 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2772 * read side of the pipe. If we close them both, epoll_wait strangely does
2773 * not return and could create a endless wait period if the pipe is the
2774 * only tracked fd in the poll set. The thread will take care of closing
2775 * the read side.
fb3a43a9 2776 */
13886d2d 2777 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2778
2d57de81 2779error_testpoint:
1fc79fb4
MD
2780 if (err) {
2781 health_error();
2782 ERR("Health error occurred in %s", __func__);
2783 }
2784 health_unregister(health_consumerd);
2785
e7b994a3 2786 rcu_unregister_thread();
cd9adb8b 2787 return nullptr;
3bd1e081
MD
2788}
2789
d8ef542d
MD
2790/*
2791 * Close wake-up end of each stream belonging to the channel. This will
2792 * allow the poll() on the stream read-side to detect when the
2793 * write-side (application) finally closes them.
2794 */
28ab034a 2795static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
d8ef542d
MD
2796{
2797 struct lttng_ht *ht;
2798 struct lttng_consumer_stream *stream;
2799 struct lttng_ht_iter iter;
2800
fa29bfbf 2801 ht = the_consumer_data.stream_per_chan_id_ht;
d8ef542d 2802
56047f5a 2803 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2804 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
2805 ht->hash_fct(&channel->key, lttng_ht_seed),
2806 ht->match_fct,
2807 &channel->key,
2808 &iter.iter,
2809 stream,
2810 node_channel_id.node)
2811 {
f2ad556d
MD
2812 /*
2813 * Protect against teardown with mutex.
2814 */
2815 pthread_mutex_lock(&stream->lock);
2816 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2817 goto next;
2818 }
fa29bfbf 2819 switch (the_consumer_data.type) {
d8ef542d
MD
2820 case LTTNG_CONSUMER_KERNEL:
2821 break;
2822 case LTTNG_CONSUMER32_UST:
2823 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2824 if (stream->metadata_flag) {
2825 /* Safe and protected by the stream lock. */
2826 lttng_ustconsumer_close_metadata(stream->chan);
2827 } else {
2828 /*
2829 * Note: a mutex is taken internally within
2830 * liblttng-ust-ctl to protect timer wakeup_fd
2831 * use from concurrent close.
2832 */
2833 lttng_ustconsumer_close_stream_wakeup(stream);
2834 }
d8ef542d
MD
2835 break;
2836 default:
2837 ERR("Unknown consumer_data type");
a0377dfe 2838 abort();
d8ef542d 2839 }
f2ad556d
MD
2840 next:
2841 pthread_mutex_unlock(&stream->lock);
d8ef542d 2842 }
d8ef542d
MD
2843}
2844
2845static void destroy_channel_ht(struct lttng_ht *ht)
2846{
2847 struct lttng_ht_iter iter;
2848 struct lttng_consumer_channel *channel;
2849 int ret;
2850
cd9adb8b 2851 if (ht == nullptr) {
d8ef542d
MD
2852 return;
2853 }
2854
56047f5a
JG
2855 {
2856 lttng::urcu::read_lock_guard read_lock;
2857
2858 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2859 ret = lttng_ht_del(ht, &iter);
2860 LTTNG_ASSERT(ret != 0);
2861 }
d8ef542d 2862 }
d8ef542d
MD
2863
2864 lttng_ht_destroy(ht);
2865}
2866
2867/*
2868 * This thread polls the channel fds to detect when they are being
2869 * closed. It closes all related streams if the channel is detected as
2870 * closed. It is currently only used as a shim layer for UST because the
2871 * consumerd needs to keep the per-stream wakeup end of pipes open for
2872 * periodical flush.
2873 */
2874void *consumer_thread_channel_poll(void *data)
2875{
1fc79fb4 2876 int ret, i, pollfd, err = -1;
d8ef542d 2877 uint32_t revents, nb_fd;
cd9adb8b 2878 struct lttng_consumer_channel *chan = nullptr;
d8ef542d
MD
2879 struct lttng_ht_iter iter;
2880 struct lttng_ht_node_u64 *node;
2881 struct lttng_poll_event events;
97535efa 2882 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
d8ef542d
MD
2883 struct lttng_ht *channel_ht;
2884
2885 rcu_register_thread();
2886
1fc79fb4
MD
2887 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2888
2d57de81
MD
2889 if (testpoint(consumerd_thread_channel)) {
2890 goto error_testpoint;
2891 }
2892
9ce5646a
MD
2893 health_code_update();
2894
d8ef542d
MD
2895 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2896 if (!channel_ht) {
2897 /* ENOMEM at this point. Better to bail out. */
2898 goto end_ht;
2899 }
2900
2901 DBG("Thread channel poll started");
2902
2903 /* Size is set to 1 for the consumer_channel pipe */
2904 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2905 if (ret < 0) {
2906 ERR("Poll set creation failed");
2907 goto end_poll;
2908 }
2909
2910 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2911 if (ret < 0) {
2912 goto end;
2913 }
2914
2915 /* Main loop */
2916 DBG("Channel main loop started");
2917
cd9adb8b 2918 while (true) {
28ab034a 2919 restart:
7fa2082e
MD
2920 health_code_update();
2921 DBG("Channel poll wait");
9ce5646a 2922 health_poll_entry();
d8ef542d 2923 ret = lttng_poll_wait(&events, -1);
28ab034a 2924 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2925 health_poll_exit();
40063ead 2926 DBG("Channel event caught in thread");
d8ef542d
MD
2927 if (ret < 0) {
2928 if (errno == EINTR) {
40063ead 2929 ERR("Poll EINTR caught");
d8ef542d
MD
2930 goto restart;
2931 }
d9607cd7 2932 if (LTTNG_POLL_GETNB(&events) == 0) {
28ab034a 2933 err = 0; /* All is OK */
d9607cd7 2934 }
d8ef542d
MD
2935 goto end;
2936 }
2937
2938 nb_fd = ret;
2939
2940 /* From here, the event is a channel wait fd */
2941 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2942 health_code_update();
2943
d8ef542d
MD
2944 revents = LTTNG_POLL_GETEV(&events, i);
2945 pollfd = LTTNG_POLL_GETFD(&events, i);
2946
d8ef542d 2947 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 2948 if (revents & LPOLLIN) {
d8ef542d 2949 enum consumer_channel_action action;
a0cbdd2e 2950 uint64_t key;
d8ef542d 2951
a0cbdd2e 2952 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 2953 if (ret <= 0) {
03e43155
MD
2954 if (ret < 0) {
2955 ERR("Error reading channel pipe");
2956 }
28ab034a
JG
2957 lttng_poll_del(&events,
2958 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
2959 continue;
2960 }
2961
2962 switch (action) {
2963 case CONSUMER_CHANNEL_ADD:
56047f5a 2964 {
28ab034a 2965 DBG("Adding channel %d to poll set", chan->wait_fd);
d8ef542d
MD
2966
2967 lttng_ht_node_init_u64(&chan->wait_fd_node,
28ab034a 2968 chan->wait_fd);
56047f5a 2969 lttng::urcu::read_lock_guard read_lock;
d8ef542d 2970 lttng_ht_add_unique_u64(channel_ht,
28ab034a 2971 &chan->wait_fd_node);
d8ef542d 2972 /* Add channel to the global poll events list */
28ab034a
JG
2973 // FIXME: Empty flag on a pipe pollset, this might
2974 // hang on FreeBSD.
1524f98c 2975 lttng_poll_add(&events, chan->wait_fd, 0);
d8ef542d 2976 break;
56047f5a 2977 }
a0cbdd2e
MD
2978 case CONSUMER_CHANNEL_DEL:
2979 {
b4a650f3 2980 /*
28ab034a
JG
2981 * This command should never be called if the
2982 * channel has streams monitored by either the data
2983 * or metadata thread. The consumer only notify this
2984 * thread with a channel del. command if it receives
2985 * a destroy channel command from the session daemon
2986 * that send it if a command prior to the
2987 * GET_CHANNEL failed.
b4a650f3
DG
2988 */
2989
56047f5a 2990 lttng::urcu::read_lock_guard read_lock;
a0cbdd2e
MD
2991 chan = consumer_find_channel(key);
2992 if (!chan) {
28ab034a
JG
2993 ERR("UST consumer get channel key %" PRIu64
2994 " not found for del channel",
2995 key);
a0cbdd2e
MD
2996 break;
2997 }
2998 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 2999 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e 3000 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3001 LTTNG_ASSERT(ret == 0);
a0cbdd2e 3002
fa29bfbf 3003 switch (the_consumer_data.type) {
f2a444f1
DG
3004 case LTTNG_CONSUMER_KERNEL:
3005 break;
3006 case LTTNG_CONSUMER32_UST:
3007 case LTTNG_CONSUMER64_UST:
212d67a2 3008 health_code_update();
28ab034a
JG
3009 /* Destroy streams that might have been left
3010 * in the stream list. */
212d67a2 3011 clean_channel_stream_list(chan);
f2a444f1
DG
3012 break;
3013 default:
3014 ERR("Unknown consumer_data type");
a0377dfe 3015 abort();
f2a444f1
DG
3016 }
3017
a0cbdd2e 3018 /*
28ab034a
JG
3019 * Release our own refcount. Force channel deletion
3020 * even if streams were not initialized.
a0cbdd2e
MD
3021 */
3022 if (!uatomic_sub_return(&chan->refcount, 1)) {
3023 consumer_del_channel(chan);
3024 }
3025 goto restart;
3026 }
d8ef542d
MD
3027 case CONSUMER_CHANNEL_QUIT:
3028 /*
28ab034a
JG
3029 * Remove the pipe from the poll set and continue
3030 * the loop since their might be data to consume.
d8ef542d 3031 */
28ab034a
JG
3032 lttng_poll_del(&events,
3033 ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3034 continue;
3035 default:
3036 ERR("Unknown action");
3037 break;
3038 }
03e43155
MD
3039 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3040 DBG("Channel thread pipe hung up");
3041 /*
3042 * Remove the pipe from the poll set and continue the loop
3043 * since their might be data to consume.
3044 */
3045 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3046 continue;
3047 } else {
28ab034a
JG
3048 ERR("Unexpected poll events %u for sock %d",
3049 revents,
3050 pollfd);
03e43155 3051 goto end;
d8ef542d
MD
3052 }
3053
3054 /* Handle other stream */
3055 continue;
3056 }
3057
56047f5a 3058 lttng::urcu::read_lock_guard read_lock;
d8ef542d
MD
3059 {
3060 uint64_t tmp_id = (uint64_t) pollfd;
3061
3062 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3063 }
3064 node = lttng_ht_iter_get_node_u64(&iter);
a0377dfe 3065 LTTNG_ASSERT(node);
d8ef542d 3066
28ab034a 3067 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
d8ef542d
MD
3068
3069 /* Check for error event */
3070 if (revents & (LPOLLERR | LPOLLHUP)) {
3071 DBG("Channel fd %d is hup|err.", pollfd);
3072
3073 lttng_poll_del(&events, chan->wait_fd);
3074 ret = lttng_ht_del(channel_ht, &iter);
a0377dfe 3075 LTTNG_ASSERT(ret == 0);
b4a650f3
DG
3076
3077 /*
3078 * This will close the wait fd for each stream associated to
3079 * this channel AND monitored by the data/metadata thread thus
3080 * will be clean by the right thread.
3081 */
d8ef542d 3082 consumer_close_channel_streams(chan);
f2ad556d
MD
3083
3084 /* Release our own refcount */
28ab034a
JG
3085 if (!uatomic_sub_return(&chan->refcount, 1) &&
3086 !uatomic_read(&chan->nb_init_stream_left)) {
f2ad556d
MD
3087 consumer_del_channel(chan);
3088 }
03e43155
MD
3089 } else {
3090 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
03e43155 3091 goto end;
d8ef542d
MD
3092 }
3093
3094 /* Release RCU lock for the channel looked up */
d8ef542d
MD
3095 }
3096 }
3097
1fc79fb4
MD
3098 /* All is OK */
3099 err = 0;
d8ef542d
MD
3100end:
3101 lttng_poll_clean(&events);
3102end_poll:
3103 destroy_channel_ht(channel_ht);
3104end_ht:
2d57de81 3105error_testpoint:
d8ef542d 3106 DBG("Channel poll thread exiting");
1fc79fb4
MD
3107 if (err) {
3108 health_error();
3109 ERR("Health error occurred in %s", __func__);
3110 }
3111 health_unregister(health_consumerd);
d8ef542d 3112 rcu_unregister_thread();
cd9adb8b 3113 return nullptr;
d8ef542d
MD
3114}
3115
331744e3 3116static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
28ab034a
JG
3117 struct pollfd *sockpoll,
3118 int client_socket)
331744e3
JD
3119{
3120 int ret;
3121
a0377dfe
FD
3122 LTTNG_ASSERT(ctx);
3123 LTTNG_ASSERT(sockpoll);
331744e3 3124
84382d49
MD
3125 ret = lttng_consumer_poll_socket(sockpoll);
3126 if (ret) {
331744e3
JD
3127 goto error;
3128 }
3129 DBG("Metadata connection on client_socket");
3130
3131 /* Blocking call, waiting for transmission */
3132 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3133 if (ctx->consumer_metadata_socket < 0) {
3134 WARN("On accept metadata");
3135 ret = -1;
3136 goto error;
3137 }
3138 ret = 0;
3139
3140error:
3141 return ret;
3142}
3143
3bd1e081
MD
3144/*
3145 * This thread listens on the consumerd socket and receives the file
3146 * descriptors from the session daemon.
3147 */
7d980def 3148void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3149{
1fc79fb4 3150 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3151 /*
3152 * structure to poll for incoming data on communication socket avoids
3153 * making blocking sockets.
3154 */
3155 struct pollfd consumer_sockpoll[2];
97535efa 3156 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3bd1e081 3157
e7b994a3
DG
3158 rcu_register_thread();
3159
1fc79fb4
MD
3160 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3161
2d57de81
MD
3162 if (testpoint(consumerd_thread_sessiond)) {
3163 goto error_testpoint;
3164 }
3165
9ce5646a
MD
3166 health_code_update();
3167
3bd1e081
MD
3168 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3169 unlink(ctx->consumer_command_sock_path);
3170 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3171 if (client_socket < 0) {
3172 ERR("Cannot create command socket");
3173 goto end;
3174 }
3175
3176 ret = lttcomm_listen_unix_sock(client_socket);
3177 if (ret < 0) {
3178 goto end;
3179 }
3180
32258573 3181 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3182 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3183 /* return < 0 on error, but == 0 is not fatal */
3184 if (ret < 0) {
32258573 3185 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3186 goto end;
3187 }
3188
3bd1e081
MD
3189 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3190 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3191 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3192 consumer_sockpoll[1].fd = client_socket;
3193 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3194
84382d49
MD
3195 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3196 if (ret) {
3197 if (ret > 0) {
3198 /* should exit */
3199 err = 0;
3200 }
3bd1e081
MD
3201 goto end;
3202 }
3203 DBG("Connection on client_socket");
3204
3205 /* Blocking call, waiting for transmission */
3206 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3207 if (sock < 0) {
3bd1e081
MD
3208 WARN("On accept");
3209 goto end;
3210 }
3bd1e081 3211
331744e3
JD
3212 /*
3213 * Setup metadata socket which is the second socket connection on the
3214 * command unix socket.
3215 */
3216 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3217 if (ret) {
3218 if (ret > 0) {
3219 /* should exit */
3220 err = 0;
3221 }
331744e3
JD
3222 goto end;
3223 }
3224
d96f09c6
DG
3225 /* This socket is not useful anymore. */
3226 ret = close(client_socket);
3227 if (ret < 0) {
3228 PERROR("close client_socket");
3229 }
3230 client_socket = -1;
3231
3bd1e081
MD
3232 /* update the polling structure to poll on the established socket */
3233 consumer_sockpoll[1].fd = sock;
3234 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3235
cd9adb8b 3236 while (true) {
9ce5646a
MD
3237 health_code_update();
3238
3239 health_poll_entry();
3240 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3241 health_poll_exit();
84382d49
MD
3242 if (ret) {
3243 if (ret > 0) {
3244 /* should exit */
3245 err = 0;
3246 }
3bd1e081
MD
3247 goto end;
3248 }
3249 DBG("Incoming command on sock");
3250 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3251 if (ret <= 0) {
3252 /*
3253 * This could simply be a session daemon quitting. Don't output
3254 * ERR() here.
3255 */
3256 DBG("Communication interrupted on command socket");
41ba6035 3257 err = 0;
3bd1e081
MD
3258 goto end;
3259 }
10211f5c 3260 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3261 DBG("consumer_thread_receive_fds received quit from signal");
28ab034a 3262 err = 0; /* All is OK */
3bd1e081
MD
3263 goto end;
3264 }
a1ed855a 3265 DBG("Received command on sock");
3bd1e081 3266 }
1fc79fb4
MD
3267 /* All is OK */
3268 err = 0;
3269
3bd1e081 3270end:
ffe60014 3271 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3272
d88aee68
DG
3273 /*
3274 * Close metadata streams since the producer is the session daemon which
3275 * just died.
3276 *
3277 * NOTE: for now, this only applies to the UST tracer.
3278 */
6d574024 3279 lttng_consumer_close_all_metadata();
d88aee68 3280
3bd1e081
MD
3281 /*
3282 * when all fds have hung up, the polling thread
3283 * can exit cleanly
3284 */
10211f5c 3285 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3286
04fdd819 3287 /*
c869f647 3288 * Notify the data poll thread to poll back again and test the
8994307f 3289 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3290 */
acdb9057 3291 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3292
cd9adb8b 3293 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3294
5c635c72
MD
3295 notify_health_quit_pipe(health_quit_pipe);
3296
d96f09c6
DG
3297 /* Cleaning up possibly open sockets. */
3298 if (sock >= 0) {
3299 ret = close(sock);
3300 if (ret < 0) {
3301 PERROR("close sock sessiond poll");
3302 }
3303 }
3304 if (client_socket >= 0) {
38476d24 3305 ret = close(client_socket);
d96f09c6
DG
3306 if (ret < 0) {
3307 PERROR("close client_socket sessiond poll");
3308 }
3309 }
3310
2d57de81 3311error_testpoint:
1fc79fb4
MD
3312 if (err) {
3313 health_error();
3314 ERR("Health error occurred in %s", __func__);
3315 }
3316 health_unregister(health_consumerd);
3317
e7b994a3 3318 rcu_unregister_thread();
cd9adb8b 3319 return nullptr;
3bd1e081 3320}
d41f73b7 3321
503fefca 3322static int post_consume(struct lttng_consumer_stream *stream,
28ab034a
JG
3323 const struct stream_subbuffer *subbuffer,
3324 struct lttng_consumer_local_data *ctx)
f96af312 3325{
503fefca 3326 size_t i;
f96af312 3327 int ret = 0;
28ab034a
JG
3328 const size_t count =
3329 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
f96af312 3330
503fefca
JG
3331 for (i = 0; i < count; i++) {
3332 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
28ab034a 3333 &stream->read_subbuffer_ops.post_consume_cbs, i);
503fefca
JG
3334
3335 ret = op(stream, subbuffer, ctx);
3336 if (ret) {
3337 goto end;
f96af312 3338 }
f96af312 3339 }
f96af312
JG
3340end:
3341 return ret;
3342}
3343
4078b776 3344ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
28ab034a
JG
3345 struct lttng_consumer_local_data *ctx,
3346 bool locked_by_caller)
d41f73b7 3347{
12bddd1d 3348 ssize_t ret, written_bytes = 0;
23d56598 3349 int rotation_ret;
6f9449c2 3350 struct stream_subbuffer subbuffer = {};
b6797c8e 3351 enum get_next_subbuffer_status get_next_status;
74251bb8 3352
6f9449c2
JG
3353 if (!locked_by_caller) {
3354 stream->read_subbuffer_ops.lock(stream);
947bd097
JR
3355 } else {
3356 stream->read_subbuffer_ops.assert_locked(stream);
6f9449c2
JG
3357 }
3358
3359 if (stream->read_subbuffer_ops.on_wake_up) {
3360 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3361 if (ret) {
3362 goto end;
3363 }
94d49140 3364 }
74251bb8 3365
23d56598
JG
3366 /*
3367 * If the stream was flagged to be ready for rotation before we extract
3368 * the next packet, rotate it now.
3369 */
3370 if (stream->rotate_ready) {
3371 DBG("Rotate stream before consuming data");
f46376a1 3372 ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3373 if (ret < 0) {
3374 ERR("Stream rotation error before consuming data");
3375 goto end;
3376 }
3377 }
3378
28ab034a 3379 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
b6797c8e
JG
3380 switch (get_next_status) {
3381 case GET_NEXT_SUBBUFFER_STATUS_OK:
3382 break;
3383 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3384 /* Not an error. */
3385 ret = 0;
3386 goto sleep_stream;
3387 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3388 ret = -1;
6f9449c2 3389 goto end;
b6797c8e
JG
3390 default:
3391 abort();
d41f73b7 3392 }
74251bb8 3393
28ab034a 3394 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
6f9449c2
JG
3395 if (ret) {
3396 goto error_put_subbuf;
3397 }
3398
28ab034a 3399 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
514775d9
FD
3400 if (written_bytes <= 0) {
3401 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3402 ret = (int) written_bytes;
3403 goto error_put_subbuf;
6f9449c2
JG
3404 }
3405
3406 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3407 if (ret) {
23d56598
JG
3408 goto end;
3409 }
3410
503fefca
JG
3411 ret = post_consume(stream, &subbuffer, ctx);
3412 if (ret) {
3413 goto end;
6f9449c2
JG
3414 }
3415
23d56598
JG
3416 /*
3417 * After extracting the packet, we check if the stream is now ready to
3418 * be rotated and perform the action immediately.
3419 *
3420 * Don't overwrite `ret` as callers expect the number of bytes
3421 * consumed to be returned on success.
3422 */
3423 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3424 if (rotation_ret == 1) {
f46376a1 3425 rotation_ret = lttng_consumer_rotate_stream(stream);
23d56598
JG
3426 if (rotation_ret < 0) {
3427 ret = rotation_ret;
3428 ERR("Stream rotation error after consuming data");
3429 goto end;
3430 }
503fefca 3431
23d56598
JG
3432 } else if (rotation_ret < 0) {
3433 ret = rotation_ret;
3434 ERR("Failed to check if stream was ready to rotate after consuming data");
3435 goto end;
3436 }
3437
82e72193 3438sleep_stream:
6f9449c2
JG
3439 if (stream->read_subbuffer_ops.on_sleep) {
3440 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3441 }
3442
3443 ret = written_bytes;
23d56598 3444end:
6f9449c2
JG
3445 if (!locked_by_caller) {
3446 stream->read_subbuffer_ops.unlock(stream);
94d49140 3447 }
6f9449c2 3448
74251bb8 3449 return ret;
6f9449c2
JG
3450error_put_subbuf:
3451 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3452 goto end;
d41f73b7
MD
3453}
3454
3455int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3456{
fa29bfbf 3457 switch (the_consumer_data.type) {
d41f73b7
MD
3458 case LTTNG_CONSUMER_KERNEL:
3459 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3460 case LTTNG_CONSUMER32_UST:
3461 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3462 return lttng_ustconsumer_on_recv_stream(stream);
3463 default:
3464 ERR("Unknown consumer_data type");
a0377dfe 3465 abort();
d41f73b7
MD
3466 return -ENOSYS;
3467 }
3468}
e4421fec
DG
3469
3470/*
3471 * Allocate and set consumer data hash tables.
3472 */
cd9adb8b 3473int lttng_consumer_init()
e4421fec 3474{
fa29bfbf
SM
3475 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3476 if (!the_consumer_data.channel_ht) {
282dadbc
MD
3477 goto error;
3478 }
3479
28ab034a 3480 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3481 if (!the_consumer_data.channels_by_session_id_ht) {
5c3892a6
JG
3482 goto error;
3483 }
3484
fa29bfbf
SM
3485 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3486 if (!the_consumer_data.relayd_ht) {
282dadbc
MD
3487 goto error;
3488 }
3489
fa29bfbf
SM
3490 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3491 if (!the_consumer_data.stream_list_ht) {
282dadbc
MD
3492 goto error;
3493 }
3494
28ab034a 3495 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
fa29bfbf 3496 if (!the_consumer_data.stream_per_chan_id_ht) {
282dadbc
MD
3497 goto error;
3498 }
3499
3500 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3501 if (!data_ht) {
3502 goto error;
3503 }
3504
3505 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3506 if (!metadata_ht) {
3507 goto error;
3508 }
3509
fa29bfbf
SM
3510 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3511 if (!the_consumer_data.chunk_registry) {
28cc88f3
JG
3512 goto error;
3513 }
3514
282dadbc
MD
3515 return 0;
3516
3517error:
3518 return -1;
e4421fec 3519}
7735ef9e
DG
3520
3521/*
3522 * Process the ADD_RELAYD command receive by a consumer.
3523 *
3524 * This will create a relayd socket pair and add it to the relayd hash table.
3525 * The caller MUST acquire a RCU read side lock before calling it.
3526 */
4222116f 3527void consumer_add_relayd_socket(uint64_t net_seq_idx,
28ab034a
JG
3528 int sock_type,
3529 struct lttng_consumer_local_data *ctx,
3530 int sock,
3531 struct pollfd *consumer_sockpoll,
3532 uint64_t sessiond_id,
3533 uint64_t relayd_session_id,
3534 uint32_t relayd_version_major,
3535 uint32_t relayd_version_minor,
3536 enum lttcomm_sock_proto relayd_socket_protocol)
7735ef9e 3537{
cd2b09ed 3538 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3539 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 3540 struct consumer_relayd_sock_pair *relayd = nullptr;
7735ef9e 3541
a0377dfe 3542 LTTNG_ASSERT(ctx);
4222116f 3543 LTTNG_ASSERT(sock >= 0);
48b7cdc2 3544 ASSERT_RCU_READ_LOCKED();
6151a90f 3545
da009f2c 3546 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3547
3548 /* Get relayd reference if exists. */
3549 relayd = consumer_find_relayd(net_seq_idx);
cd9adb8b 3550 if (relayd == nullptr) {
a0377dfe 3551 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3552 /* Not found. Allocate one. */
3553 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
cd9adb8b 3554 if (relayd == nullptr) {
618a6a28
MD
3555 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3556 goto error;
0d08d75e 3557 } else {
30319bcb 3558 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3559 relayd_created = 1;
7735ef9e 3560 }
0d08d75e
DG
3561
3562 /*
3563 * This code path MUST continue to the consumer send status message to
3564 * we can notify the session daemon and continue our work without
3565 * killing everything.
3566 */
da009f2c
MD
3567 } else {
3568 /*
3569 * relayd key should never be found for control socket.
3570 */
a0377dfe 3571 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3572 }
3573
3574 /* First send a status message before receiving the fds. */
0c759fc9 3575 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3576 if (ret < 0) {
0d08d75e 3577 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3578 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3579 goto error_nosignal;
7735ef9e
DG
3580 }
3581
3582 /* Poll on consumer socket. */
84382d49
MD
3583 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3584 if (ret) {
3585 /* Needing to exit in the middle of a command: error. */
0d08d75e 3586 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3587 goto error_nosignal;
7735ef9e
DG
3588 }
3589
3590 /* Get relayd socket from session daemon */
3591 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3592 if (ret != sizeof(fd)) {
28ab034a 3593 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3594
3595 /*
3596 * Failing to receive FDs might indicate a major problem such as
3597 * reaching a fd limit during the receive where the kernel returns a
3598 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3599 * don't take any chances and stop everything.
3600 *
3601 * XXX: Feature request #558 will fix that and avoid this possible
3602 * issue when reaching the fd limit.
3603 */
3604 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3605 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3606 goto error;
3607 }
3608
7735ef9e
DG
3609 /* Copy socket information and received FD */
3610 switch (sock_type) {
3611 case LTTNG_STREAM_CONTROL:
3612 /* Copy received lttcomm socket */
4222116f 3613 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3614 &relayd->control_sock.sock, fd, relayd_socket_protocol);
7735ef9e 3615
6151a90f 3616 /* Assign version values. */
4222116f
JR
3617 relayd->control_sock.major = relayd_version_major;
3618 relayd->control_sock.minor = relayd_version_minor;
c5b6f4f0 3619
d3e2ba59 3620 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3621
7735ef9e
DG
3622 break;
3623 case LTTNG_STREAM_DATA:
3624 /* Copy received lttcomm socket */
4222116f 3625 ret = lttcomm_populate_sock_from_open_socket(
28ab034a 3626 &relayd->data_sock.sock, fd, relayd_socket_protocol);
6151a90f 3627 /* Assign version values. */
4222116f
JR
3628 relayd->data_sock.major = relayd_version_major;
3629 relayd->data_sock.minor = relayd_version_minor;
7735ef9e
DG
3630 break;
3631 default:
3632 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3633 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3634 goto error;
3635 }
3636
4222116f
JR
3637 if (ret < 0) {
3638 ret_code = LTTCOMM_CONSUMERD_FATAL;
3639 goto error;
3640 }
3641
d88aee68 3642 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
28ab034a
JG
3643 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3644 relayd->net_seq_idx,
3645 fd);
39d9954c
FD
3646 /*
3647 * We gave the ownership of the fd to the relayd structure. Set the
3648 * fd to -1 so we don't call close() on it in the error path below.
3649 */
3650 fd = -1;
7735ef9e 3651
618a6a28
MD
3652 /* We successfully added the socket. Send status back. */
3653 ret = consumer_send_status_msg(sock, ret_code);
3654 if (ret < 0) {
3655 /* Somehow, the session daemon is not responding anymore. */
3656 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3657 goto error_nosignal;
3658 }
3659
7735ef9e
DG
3660 /*
3661 * Add relayd socket pair to consumer data hashtable. If object already
3662 * exists or on error, the function gracefully returns.
3663 */
9276e5c8 3664 relayd->ctx = ctx;
d09e1200 3665 add_relayd(relayd);
7735ef9e
DG
3666
3667 /* All good! */
2527bf85 3668 return;
7735ef9e
DG
3669
3670error:
618a6a28
MD
3671 if (consumer_send_status_msg(sock, ret_code) < 0) {
3672 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3673 }
3674
3675error_nosignal:
4028eeb9
DG
3676 /* Close received socket if valid. */
3677 if (fd >= 0) {
3678 if (close(fd)) {
3679 PERROR("close received socket");
3680 }
3681 }
cd2b09ed
DG
3682
3683 if (relayd_created) {
cd2b09ed
DG
3684 free(relayd);
3685 }
7735ef9e 3686}
ca22feea 3687
f7079f67
DG
3688/*
3689 * Search for a relayd associated to the session id and return the reference.
3690 *
3691 * A rcu read side lock MUST be acquire before calling this function and locked
3692 * until the relayd object is no longer necessary.
3693 */
3694static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3695{
3696 struct lttng_ht_iter iter;
cd9adb8b 3697 struct consumer_relayd_sock_pair *relayd = nullptr;
f7079f67 3698
48b7cdc2
FD
3699 ASSERT_RCU_READ_LOCKED();
3700
f7079f67 3701 /* Iterate over all relayd since they are indexed by net_seq_idx. */
28ab034a 3702 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
18261bd1
DG
3703 /*
3704 * Check by sessiond id which is unique here where the relayd session
3705 * id might not be when having multiple relayd.
3706 */
3707 if (relayd->sessiond_session_id == id) {
f7079f67 3708 /* Found the relayd. There can be only one per id. */
18261bd1 3709 goto found;
f7079f67
DG
3710 }
3711 }
3712
cd9adb8b 3713 return nullptr;
18261bd1
DG
3714
3715found:
f7079f67
DG
3716 return relayd;
3717}
3718
ca22feea
DG
3719/*
3720 * Check if for a given session id there is still data needed to be extract
3721 * from the buffers.
3722 *
6d805429 3723 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3724 */
6d805429 3725int consumer_data_pending(uint64_t id)
ca22feea
DG
3726{
3727 int ret;
3728 struct lttng_ht_iter iter;
3729 struct lttng_ht *ht;
3730 struct lttng_consumer_stream *stream;
cd9adb8b 3731 struct consumer_relayd_sock_pair *relayd = nullptr;
6d805429 3732 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3733
6d805429 3734 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3735
56047f5a 3736 lttng::urcu::read_lock_guard read_lock;
fa29bfbf 3737 pthread_mutex_lock(&the_consumer_data.lock);
ca22feea 3738
fa29bfbf 3739 switch (the_consumer_data.type) {
ca22feea 3740 case LTTNG_CONSUMER_KERNEL:
6d805429 3741 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3742 break;
3743 case LTTNG_CONSUMER32_UST:
3744 case LTTNG_CONSUMER64_UST:
6d805429 3745 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3746 break;
3747 default:
3748 ERR("Unknown consumer data type");
a0377dfe 3749 abort();
ca22feea
DG
3750 }
3751
3752 /* Ease our life a bit */
fa29bfbf 3753 ht = the_consumer_data.stream_list_ht;
ca22feea 3754
c8f59ee5 3755 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3756 ht->hash_fct(&id, lttng_ht_seed),
3757 ht->match_fct,
3758 &id,
3759 &iter.iter,
3760 stream,
3761 node_session_id.node)
3762 {
bb586a6e 3763 pthread_mutex_lock(&stream->lock);
ca22feea 3764
4e9a4686
DG
3765 /*
3766 * A removed node from the hash table indicates that the stream has
3767 * been deleted thus having a guarantee that the buffers are closed
3768 * on the consumer side. However, data can still be transmitted
3769 * over the network so don't skip the relayd check.
3770 */
3771 ret = cds_lfht_is_node_deleted(&stream->node.node);
3772 if (!ret) {
3773 /* Check the stream if there is data in the buffers. */
6d805429
DG
3774 ret = data_pending(stream);
3775 if (ret == 1) {
4e9a4686 3776 pthread_mutex_unlock(&stream->lock);
f7079f67 3777 goto data_pending;
4e9a4686
DG
3778 }
3779 }
3780
d9f0c7c7
JR
3781 pthread_mutex_unlock(&stream->lock);
3782 }
3783
3784 relayd = find_relayd_by_session_id(id);
3785 if (relayd) {
3786 unsigned int is_data_inflight = 0;
3787
3788 /* Send init command for data pending. */
3789 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 3790 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
d9f0c7c7
JR
3791 if (ret < 0) {
3792 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3793 /* Communication error thus the relayd so no data pending. */
3794 goto data_not_pending;
3795 }
3796
3797 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3798 ht->hash_fct(&id, lttng_ht_seed),
3799 ht->match_fct,
3800 &id,
3801 &iter.iter,
3802 stream,
3803 node_session_id.node)
3804 {
c8f59ee5 3805 if (stream->metadata_flag) {
ad7051c0 3806 ret = relayd_quiescent_control(&relayd->control_sock,
28ab034a 3807 stream->relayd_stream_id);
c8f59ee5 3808 } else {
6d805429 3809 ret = relayd_data_pending(&relayd->control_sock,
28ab034a
JG
3810 stream->relayd_stream_id,
3811 stream->next_net_seq_num - 1);
c8f59ee5 3812 }
d9f0c7c7
JR
3813
3814 if (ret == 1) {
3815 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3816 goto data_pending;
3817 } else if (ret < 0) {
28ab034a
JG
3818 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3819 relayd->net_seq_idx);
9276e5c8
JR
3820 lttng_consumer_cleanup_relayd(relayd);
3821 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
9276e5c8
JR
3822 goto data_not_pending;
3823 }
c8f59ee5 3824 }
f7079f67 3825
d9f0c7c7 3826 /* Send end command for data pending. */
28ab034a
JG
3827 ret = relayd_end_data_pending(
3828 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
f7079f67 3829 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3830 if (ret < 0) {
28ab034a
JG
3831 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3832 relayd->net_seq_idx);
9276e5c8 3833 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3834 goto data_not_pending;
3835 }
bdd88757
DG
3836 if (is_data_inflight) {
3837 goto data_pending;
3838 }
f7079f67
DG
3839 }
3840
ca22feea 3841 /*
f7079f67
DG
3842 * Finding _no_ node in the hash table and no inflight data means that the
3843 * stream(s) have been removed thus data is guaranteed to be available for
3844 * analysis from the trace files.
ca22feea
DG
3845 */
3846
f7079f67 3847data_not_pending:
ca22feea 3848 /* Data is available to be read by a viewer. */
fa29bfbf 3849 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3850 return 0;
ca22feea 3851
f7079f67 3852data_pending:
ca22feea 3853 /* Data is still being extracted from buffers. */
fa29bfbf 3854 pthread_mutex_unlock(&the_consumer_data.lock);
6d805429 3855 return 1;
ca22feea 3856}
f50f23d9
DG
3857
3858/*
3859 * Send a ret code status message to the sessiond daemon.
3860 *
3861 * Return the sendmsg() return value.
3862 */
3863int consumer_send_status_msg(int sock, int ret_code)
3864{
3865 struct lttcomm_consumer_status_msg msg;
3866
53efb85a 3867 memset(&msg, 0, sizeof(msg));
97535efa 3868 msg.ret_code = (lttcomm_return_code) ret_code;
f50f23d9
DG
3869
3870 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3871}
ffe60014
DG
3872
3873/*
3874 * Send a channel status message to the sessiond daemon.
3875 *
3876 * Return the sendmsg() return value.
3877 */
28ab034a 3878int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
ffe60014
DG
3879{
3880 struct lttcomm_consumer_status_channel msg;
3881
a0377dfe 3882 LTTNG_ASSERT(sock >= 0);
ffe60014 3883
53efb85a 3884 memset(&msg, 0, sizeof(msg));
ffe60014 3885 if (!channel) {
0c759fc9 3886 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3887 } else {
0c759fc9 3888 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3889 msg.key = channel->key;
3890 msg.stream_count = channel->streams.count;
3891 }
3892
3893 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3894}
5c786ded 3895
d07ceecd 3896unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
28ab034a
JG
3897 unsigned long produced_pos,
3898 uint64_t nb_packets_per_stream,
3899 uint64_t max_sb_size)
5c786ded 3900{
d07ceecd 3901 unsigned long start_pos;
5c786ded 3902
d07ceecd 3903 if (!nb_packets_per_stream) {
28ab034a 3904 return consumed_pos; /* Grab everything */
d07ceecd 3905 }
1cbd136b 3906 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
d07ceecd
MD
3907 start_pos -= max_sb_size * nb_packets_per_stream;
3908 if ((long) (start_pos - consumed_pos) < 0) {
28ab034a 3909 return consumed_pos; /* Grab everything */
d07ceecd
MD
3910 }
3911 return start_pos;
5c786ded 3912}
a1ae2ea5 3913
c1dcb8bb
JG
3914/* Stream lock must be held by the caller. */
3915static int sample_stream_positions(struct lttng_consumer_stream *stream,
28ab034a
JG
3916 unsigned long *produced,
3917 unsigned long *consumed)
c1dcb8bb
JG
3918{
3919 int ret;
3920
3921 ASSERT_LOCKED(stream->lock);
3922
3923 ret = lttng_consumer_sample_snapshot_positions(stream);
3924 if (ret < 0) {
3925 ERR("Failed to sample snapshot positions");
3926 goto end;
3927 }
3928
3929 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3930 if (ret < 0) {
3931 ERR("Failed to sample produced position");
3932 goto end;
3933 }
3934
3935 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3936 if (ret < 0) {
3937 ERR("Failed to sample consumed position");
3938 goto end;
3939 }
3940
3941end:
3942 return ret;
3943}
3944
b99a8d42
JD
3945/*
3946 * Sample the rotate position for all the streams of a channel. If a stream
3947 * is already at the rotate position (produced == consumed), we flag it as
3948 * ready for rotation. The rotation of ready streams occurs after we have
3949 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3950 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3951 *
3952 * Returns 0 on success, < 0 on error
3953 */
92b7a7f8 3954int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
28ab034a
JG
3955 uint64_t key,
3956 uint64_t relayd_id)
b99a8d42
JD
3957{
3958 int ret;
b99a8d42
JD
3959 struct lttng_consumer_stream *stream;
3960 struct lttng_ht_iter iter;
fa29bfbf 3961 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
c35f9726
JG
3962 struct lttng_dynamic_array stream_rotation_positions;
3963 uint64_t next_chunk_id, stream_count = 0;
3964 enum lttng_trace_chunk_status chunk_status;
3965 const bool is_local_trace = relayd_id == -1ULL;
cd9adb8b 3966 struct consumer_relayd_sock_pair *relayd = nullptr;
c35f9726 3967 bool rotating_to_new_chunk = true;
b32703d6
JG
3968 /* Array of `struct lttng_consumer_stream *` */
3969 struct lttng_dynamic_pointer_array streams_packet_to_open;
3970 size_t stream_idx;
b99a8d42 3971
48b7cdc2
FD
3972 ASSERT_RCU_READ_LOCKED();
3973
b99a8d42
JD
3974 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3975
cd9adb8b
JG
3976 lttng_dynamic_array_init(&stream_rotation_positions,
3977 sizeof(struct relayd_stream_rotation_position),
3978 nullptr);
3979 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
c35f9726 3980
56047f5a 3981 lttng::urcu::read_lock_guard read_lock;
b99a8d42 3982
b99a8d42 3983 pthread_mutex_lock(&channel->lock);
a0377dfe 3984 LTTNG_ASSERT(channel->trace_chunk);
28ab034a 3985 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
c35f9726
JG
3986 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
3987 ret = -1;
3988 goto end_unlock_channel;
3989 }
b99a8d42
JD
3990
3991 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
3992 ht->hash_fct(&channel->key, lttng_ht_seed),
3993 ht->match_fct,
3994 &channel->key,
3995 &iter.iter,
3996 stream,
3997 node_channel_id.node)
3998 {
a40a503f 3999 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
4000
4001 health_code_update();
4002
4003 /*
4004 * Lock stream because we are about to change its state.
4005 */
4006 pthread_mutex_lock(&stream->lock);
4007
c35f9726
JG
4008 if (stream->trace_chunk == stream->chan->trace_chunk) {
4009 rotating_to_new_chunk = false;
4010 }
4011
a40a503f 4012 /*
c1dcb8bb 4013 * Do not flush a packet when rotating from a NULL trace
a9dde553 4014 * chunk. The stream has no means to output data, and the prior
c1dcb8bb
JG
4015 * rotation which rotated to NULL performed that side-effect
4016 * already. No new data can be produced when a stream has no
4017 * associated trace chunk (e.g. a stop followed by a rotate).
a40a503f 4018 */
a9dde553 4019 if (stream->trace_chunk) {
c1dcb8bb
JG
4020 bool flush_active;
4021
4022 if (stream->metadata_flag) {
4023 /*
4024 * Don't produce an empty metadata packet,
4025 * simply close the current one.
4026 *
4027 * Metadata is regenerated on every trace chunk
4028 * switch; there is no concern that no data was
4029 * produced.
4030 */
4031 flush_active = true;
4032 } else {
4033 /*
4034 * Only flush an empty packet if the "packet
4035 * open" could not be performed on transition
4036 * to a new trace chunk and no packets were
4037 * consumed within the chunk's lifetime.
4038 */
4039 if (stream->opened_packet_in_current_trace_chunk) {
4040 flush_active = true;
4041 } else {
4042 /*
4043 * Stream could have been full at the
4044 * time of rotation, but then have had
4045 * no activity at all.
4046 *
4047 * It is important to flush a packet
4048 * to prevent 0-length files from being
4049 * produced as most viewers choke on
4050 * them.
4051 *
4052 * Unfortunately viewers will not be
4053 * able to know that tracing was active
4054 * for this stream during this trace
4055 * chunk's lifetime.
4056 */
28ab034a
JG
4057 ret = sample_stream_positions(
4058 stream, &produced_pos, &consumed_pos);
c1dcb8bb
JG
4059 if (ret) {
4060 goto end_unlock_stream;
4061 }
4062
4063 /*
4064 * Don't flush an empty packet if data
4065 * was produced; it will be consumed
4066 * before the rotation completes.
4067 */
4068 flush_active = produced_pos != consumed_pos;
4069 if (!flush_active) {
c1dcb8bb
JG
4070 const char *trace_chunk_name;
4071 uint64_t trace_chunk_id;
4072
4073 chunk_status = lttng_trace_chunk_get_name(
28ab034a
JG
4074 stream->trace_chunk,
4075 &trace_chunk_name,
cd9adb8b 4076 nullptr);
c1dcb8bb
JG
4077 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4078 trace_chunk_name = "none";
4079 }
4080
4081 /*
4082 * Consumer trace chunks are
4083 * never anonymous.
4084 */
4085 chunk_status = lttng_trace_chunk_get_id(
28ab034a 4086 stream->trace_chunk, &trace_chunk_id);
a0377dfe 4087 LTTNG_ASSERT(chunk_status ==
28ab034a 4088 LTTNG_TRACE_CHUNK_STATUS_OK);
c1dcb8bb
JG
4089
4090 DBG("Unable to open packet for stream during trace chunk's lifetime. "
28ab034a
JG
4091 "Flushing an empty packet to prevent an empty file from being created: "
4092 "stream id = %" PRIu64
4093 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4094 stream->key,
4095 trace_chunk_name,
4096 trace_chunk_id);
c1dcb8bb
JG
4097 }
4098 }
4099 }
4100
a9dde553 4101 /*
c1dcb8bb
JG
4102 * Close the current packet before sampling the
4103 * ring buffer positions.
a9dde553 4104 */
c1dcb8bb 4105 ret = consumer_stream_flush_buffer(stream, flush_active);
a9dde553
MD
4106 if (ret < 0) {
4107 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
28ab034a 4108 stream->key);
a9dde553
MD
4109 goto end_unlock_stream;
4110 }
b99a8d42
JD
4111 }
4112
a40a503f
MD
4113 ret = lttng_consumer_take_snapshot(stream);
4114 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4115 ERR("Failed to sample snapshot position during channel rotation");
b99a8d42
JD
4116 goto end_unlock_stream;
4117 }
a40a503f 4118 if (!ret) {
28ab034a 4119 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
a40a503f
MD
4120 if (ret < 0) {
4121 ERR("Failed to sample produced position during channel rotation");
4122 goto end_unlock_stream;
4123 }
b99a8d42 4124
28ab034a 4125 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
a40a503f
MD
4126 if (ret < 0) {
4127 ERR("Failed to sample consumed position during channel rotation");
4128 goto end_unlock_stream;
4129 }
4130 }
4131 /*
4132 * Align produced position on the start-of-packet boundary of the first
4133 * packet going into the next trace chunk.
4134 */
1cbd136b 4135 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
a40a503f 4136 if (consumed_pos == produced_pos) {
f8528c7a 4137 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
28ab034a
JG
4138 stream->key,
4139 produced_pos,
4140 consumed_pos);
b99a8d42 4141 stream->rotate_ready = true;
f8528c7a
MD
4142 } else {
4143 DBG("Different consumed and produced positions "
28ab034a
JG
4144 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4145 stream->key,
4146 produced_pos,
4147 consumed_pos);
b99a8d42 4148 }
633d0182 4149 /*
a40a503f
MD
4150 * The rotation position is based on the packet_seq_num of the
4151 * packet following the last packet that was consumed for this
4152 * stream, incremented by the offset between produced and
4153 * consumed positions. This rotation position is a lower bound
4154 * (inclusive) at which the next trace chunk starts. Since it
4155 * is a lower bound, it is OK if the packet_seq_num does not
4156 * correspond exactly to the same packet identified by the
4157 * consumed_pos, which can happen in overwrite mode.
633d0182 4158 */
a40a503f
MD
4159 if (stream->sequence_number_unavailable) {
4160 /*
4161 * Rotation should never be performed on a session which
4162 * interacts with a pre-2.8 lttng-modules, which does
4163 * not implement packet sequence number.
4164 */
4165 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
28ab034a 4166 stream->key);
a40a503f 4167 ret = -1;
b99a8d42
JD
4168 goto end_unlock_stream;
4169 }
a40a503f 4170 stream->rotate_position = stream->last_sequence_number + 1 +
28ab034a 4171 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a 4172 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
28ab034a
JG
4173 stream->key,
4174 stream->rotate_position);
b99a8d42 4175
c35f9726 4176 if (!is_local_trace) {
633d0182
JG
4177 /*
4178 * The relay daemon control protocol expects a rotation
4179 * position as "the sequence number of the first packet
a40a503f 4180 * _after_ the current trace chunk".
633d0182 4181 */
c35f9726
JG
4182 const struct relayd_stream_rotation_position position = {
4183 .stream_id = stream->relayd_stream_id,
a40a503f 4184 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4185 };
4186
28ab034a
JG
4187 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4188 &position);
c35f9726
JG
4189 if (ret) {
4190 ERR("Failed to allocate stream rotation position");
4191 goto end_unlock_stream;
4192 }
4193 stream_count++;
4194 }
f96af312
JG
4195
4196 stream->opened_packet_in_current_trace_chunk = false;
4197
4198 if (rotating_to_new_chunk && !stream->metadata_flag) {
4199 /*
4200 * Attempt to flush an empty packet as close to the
4201 * rotation point as possible. In the event where a
4202 * stream remains inactive after the rotation point,
4203 * this ensures that the new trace chunk has a
4204 * beginning timestamp set at the begining of the
4205 * trace chunk instead of only creating an empty
4206 * packet when the trace chunk is stopped.
4207 *
4208 * This indicates to the viewers that the stream
4209 * was being recorded, but more importantly it
4210 * allows viewers to determine a useable trace
4211 * intersection.
4212 *
4213 * This presents a problem in the case where the
4214 * ring-buffer is completely full.
4215 *
4216 * Consider the following scenario:
4217 * - The consumption of data is slow (slow network,
4218 * for instance),
4219 * - The ring buffer is full,
4220 * - A rotation is initiated,
4221 * - The flush below does nothing (no space left to
4222 * open a new packet),
4223 * - The other streams rotate very soon, and new
4224 * data is produced in the new chunk,
4225 * - This stream completes its rotation long after the
4226 * rotation was initiated
4227 * - The session is stopped before any event can be
4228 * produced in this stream's buffers.
4229 *
4230 * The resulting trace chunk will have a single packet
4231 * temporaly at the end of the trace chunk for this
4232 * stream making the stream intersection more narrow
4233 * than it should be.
4234 *
4235 * To work-around this, an empty flush is performed
4236 * after the first consumption of a packet during a
4237 * rotation if open_packet fails. The idea is that
4238 * consuming a packet frees enough space to switch
4239 * packets in this scenario and allows the tracer to
4240 * "stamp" the beginning of the new trace chunk at the
4241 * earliest possible point.
b32703d6
JG
4242 *
4243 * The packet open is performed after the channel
4244 * rotation to ensure that no attempt to open a packet
4245 * is performed in a stream that has no active trace
4246 * chunk.
f96af312 4247 */
28ab034a
JG
4248 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4249 stream);
b32703d6
JG
4250 if (ret) {
4251 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
f96af312
JG
4252 ret = -1;
4253 goto end_unlock_stream;
f96af312
JG
4254 }
4255 }
4256
b99a8d42
JD
4257 pthread_mutex_unlock(&stream->lock);
4258 }
cd9adb8b 4259 stream = nullptr;
b99a8d42 4260
b32703d6
JG
4261 if (!is_local_trace) {
4262 relayd = consumer_find_relayd(relayd_id);
4263 if (!relayd) {
4264 ERR("Failed to find relayd %" PRIu64, relayd_id);
4265 ret = -1;
4266 goto end_unlock_channel;
4267 }
c35f9726 4268
b32703d6 4269 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a
JG
4270 ret = relayd_rotate_streams(&relayd->control_sock,
4271 stream_count,
cd9adb8b 4272 rotating_to_new_chunk ? &next_chunk_id : nullptr,
28ab034a
JG
4273 (const struct relayd_stream_rotation_position *)
4274 stream_rotation_positions.buffer.data);
b32703d6
JG
4275 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4276 if (ret < 0) {
4277 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
28ab034a 4278 relayd->net_seq_idx);
b32703d6
JG
4279 lttng_consumer_cleanup_relayd(relayd);
4280 goto end_unlock_channel;
4281 }
c35f9726
JG
4282 }
4283
b32703d6 4284 for (stream_idx = 0;
28ab034a
JG
4285 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4286 stream_idx++) {
b32703d6
JG
4287 enum consumer_stream_open_packet_status status;
4288
97535efa 4289 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
28ab034a 4290 &streams_packet_to_open, stream_idx);
b32703d6
JG
4291
4292 pthread_mutex_lock(&stream->lock);
4293 status = consumer_stream_open_packet(stream);
4294 pthread_mutex_unlock(&stream->lock);
4295 switch (status) {
4296 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4297 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4298 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4299 stream->key,
4300 stream->chan->name,
4301 stream->chan->session_id);
b32703d6
JG
4302 break;
4303 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4304 /*
4305 * Can't open a packet as there is no space left
4306 * in the buffer. A new packet will be opened
4307 * once one has been consumed.
4308 */
4309 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4310 ", channel name = %s, session id = %" PRIu64,
28ab034a
JG
4311 stream->key,
4312 stream->chan->name,
4313 stream->chan->session_id);
b32703d6
JG
4314 break;
4315 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4316 /* Logged by callee. */
4317 ret = -1;
7a86c13d 4318 goto end_unlock_channel;
b32703d6
JG
4319 default:
4320 abort();
4321 }
c35f9726
JG
4322 }
4323
b32703d6 4324 pthread_mutex_unlock(&channel->lock);
b99a8d42
JD
4325 ret = 0;
4326 goto end;
4327
4328end_unlock_stream:
4329 pthread_mutex_unlock(&stream->lock);
c35f9726 4330end_unlock_channel:
b99a8d42
JD
4331 pthread_mutex_unlock(&channel->lock);
4332end:
c35f9726 4333 lttng_dynamic_array_reset(&stream_rotation_positions);
b32703d6 4334 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
b99a8d42
JD
4335 return ret;
4336}
4337
28ab034a 4338static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4339{
4340 int ret = 0;
4341 unsigned long consumed_pos_before, consumed_pos_after;
4342
4343 ret = lttng_consumer_sample_snapshot_positions(stream);
4344 if (ret < 0) {
4345 ERR("Taking snapshot positions");
4346 goto end;
4347 }
4348
4349 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4350 if (ret < 0) {
4351 ERR("Consumed snapshot position");
4352 goto end;
4353 }
4354
fa29bfbf 4355 switch (the_consumer_data.type) {
5f3aff8b
MD
4356 case LTTNG_CONSUMER_KERNEL:
4357 ret = kernctl_buffer_clear(stream->wait_fd);
4358 if (ret < 0) {
96393977 4359 ERR("Failed to clear kernel stream (ret = %d)", ret);
5f3aff8b
MD
4360 goto end;
4361 }
4362 break;
4363 case LTTNG_CONSUMER32_UST:
4364 case LTTNG_CONSUMER64_UST:
881fc67f
MD
4365 ret = lttng_ustconsumer_clear_buffer(stream);
4366 if (ret < 0) {
4367 ERR("Failed to clear ust stream (ret = %d)", ret);
4368 goto end;
4369 }
5f3aff8b
MD
4370 break;
4371 default:
4372 ERR("Unknown consumer_data type");
4373 abort();
4374 }
4375
4376 ret = lttng_consumer_sample_snapshot_positions(stream);
4377 if (ret < 0) {
4378 ERR("Taking snapshot positions");
4379 goto end;
4380 }
4381 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4382 if (ret < 0) {
4383 ERR("Consumed snapshot position");
4384 goto end;
4385 }
4386 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4387end:
4388 return ret;
4389}
4390
28ab034a 4391static int consumer_clear_stream(struct lttng_consumer_stream *stream)
5f3aff8b
MD
4392{
4393 int ret;
4394
cd9adb8b 4395 ret = consumer_stream_flush_buffer(stream, true);
5f3aff8b 4396 if (ret < 0) {
28ab034a 4397 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4398 ret = LTTCOMM_CONSUMERD_FATAL;
4399 goto error;
4400 }
4401
4402 ret = consumer_clear_buffer(stream);
4403 if (ret < 0) {
28ab034a 4404 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
5f3aff8b
MD
4405 ret = LTTCOMM_CONSUMERD_FATAL;
4406 goto error;
4407 }
4408
4409 ret = LTTCOMM_CONSUMERD_SUCCESS;
4410error:
4411 return ret;
4412}
4413
28ab034a 4414static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
4415{
4416 int ret;
4417 struct lttng_consumer_stream *stream;
4418
56047f5a 4419 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 4420 pthread_mutex_lock(&channel->lock);
28ab034a 4421 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5f3aff8b
MD
4422 health_code_update();
4423 pthread_mutex_lock(&stream->lock);
4424 ret = consumer_clear_stream(stream);
4425 if (ret) {
4426 goto error_unlock;
4427 }
4428 pthread_mutex_unlock(&stream->lock);
4429 }
4430 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4431 return 0;
4432
4433error_unlock:
4434 pthread_mutex_unlock(&stream->lock);
4435 pthread_mutex_unlock(&channel->lock);
5f3aff8b
MD
4436 return ret;
4437}
4438
02d02e31
JD
4439/*
4440 * Check if a stream is ready to be rotated after extracting it.
4441 *
4442 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4443 * error. Stream lock must be held.
4444 */
4445int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4446{
28ab034a
JG
4447 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4448 " last_sequence_number %" PRIu64,
4449 stream->key,
4450 stream->rotate_ready,
4451 stream->rotate_position,
4452 stream->last_sequence_number);
02d02e31 4453 if (stream->rotate_ready) {
a40a503f 4454 return 1;
02d02e31
JD
4455 }
4456
4457 /*
a40a503f
MD
4458 * If packet seq num is unavailable, it means we are interacting
4459 * with a pre-2.8 lttng-modules which does not implement the
4460 * sequence number. Rotation should never be used by sessiond in this
4461 * scenario.
02d02e31 4462 */
a40a503f
MD
4463 if (stream->sequence_number_unavailable) {
4464 ERR("Internal error: rotation used on stream %" PRIu64
28ab034a
JG
4465 " with unavailable sequence number",
4466 stream->key);
a40a503f 4467 return -1;
02d02e31
JD
4468 }
4469
28ab034a 4470 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
a40a503f 4471 return 0;
02d02e31
JD
4472 }
4473
a40a503f
MD
4474 /*
4475 * Rotate position not reached yet. The stream rotate position is
4476 * the position of the next packet belonging to the next trace chunk,
4477 * but consumerd considers rotation ready when reaching the last
4478 * packet of the current chunk, hence the "rotate_position - 1".
4479 */
f8528c7a 4480
28ab034a
JG
4481 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4482 " rotate_position %" PRIu64,
4483 stream->key,
4484 stream->last_sequence_number,
4485 stream->rotate_position);
a40a503f
MD
4486 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4487 return 1;
02d02e31 4488 }
02d02e31 4489
a40a503f 4490 return 0;
02d02e31
JD
4491}
4492
d73bf3d7
JD
4493/*
4494 * Reset the state for a stream after a rotation occurred.
4495 */
4496void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4497{
28ab034a 4498 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
a40a503f 4499 stream->rotate_position = -1ULL;
d73bf3d7
JD
4500 stream->rotate_ready = false;
4501}
4502
4503/*
4504 * Perform the rotation a local stream file.
4505 */
28ab034a 4506static int rotate_local_stream(struct lttng_consumer_stream *stream)
d73bf3d7 4507{
d2956687 4508 int ret = 0;
d73bf3d7 4509
d2956687 4510 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
28ab034a
JG
4511 stream->key,
4512 stream->chan->key);
d73bf3d7 4513 stream->tracefile_size_current = 0;
d2956687 4514 stream->tracefile_count_current = 0;
d73bf3d7 4515
d2956687
JG
4516 if (stream->out_fd >= 0) {
4517 ret = close(stream->out_fd);
4518 if (ret) {
4519 PERROR("Failed to close stream out_fd of channel \"%s\"",
28ab034a 4520 stream->chan->name);
d2956687
JG
4521 }
4522 stream->out_fd = -1;
4523 }
d73bf3d7 4524
d2956687 4525 if (stream->index_file) {
d73bf3d7 4526 lttng_index_file_put(stream->index_file);
cd9adb8b 4527 stream->index_file = nullptr;
d73bf3d7
JD
4528 }
4529
d2956687
JG
4530 if (!stream->trace_chunk) {
4531 goto end;
4532 }
d73bf3d7 4533
d2956687 4534 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4535end:
4536 return ret;
d73bf3d7
JD
4537}
4538
d73bf3d7
JD
4539/*
4540 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4541 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4542 *
4543 * Return 0 on success, a negative number of error.
4544 */
f46376a1 4545int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
d73bf3d7
JD
4546{
4547 int ret;
4548
4549 DBG("Consumer rotate stream %" PRIu64, stream->key);
4550
d2956687
JG
4551 /*
4552 * Update the stream's 'current' chunk to the session's (channel)
4553 * now-current chunk.
4554 */
4555 lttng_trace_chunk_put(stream->trace_chunk);
4556 if (stream->chan->trace_chunk == stream->trace_chunk) {
4557 /*
4558 * A channel can be rotated and not have a "next" chunk
4559 * to transition to. In that case, the channel's "current chunk"
4560 * has not been closed yet, but it has not been updated to
4561 * a "next" trace chunk either. Hence, the stream, like its
4562 * parent channel, becomes part of no chunk and can't output
4563 * anything until a new trace chunk is created.
4564 */
cd9adb8b 4565 stream->trace_chunk = nullptr;
28ab034a 4566 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
d2956687
JG
4567 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4568 ret = -1;
4569 goto error;
4570 } else {
4571 /*
4572 * Update the stream's trace chunk to its parent channel's
4573 * current trace chunk.
4574 */
4575 stream->trace_chunk = stream->chan->trace_chunk;
4576 }
4577
c35f9726 4578 if (stream->net_seq_idx == (uint64_t) -1ULL) {
f46376a1 4579 ret = rotate_local_stream(stream);
c35f9726
JG
4580 if (ret < 0) {
4581 ERR("Failed to rotate stream, ret = %i", ret);
4582 goto error;
4583 }
d73bf3d7
JD
4584 }
4585
d2956687
JG
4586 if (stream->metadata_flag && stream->trace_chunk) {
4587 /*
4588 * If the stream has transitioned to a new trace
4589 * chunk, the metadata should be re-dumped to the
4590 * newest chunk.
4591 *
4592 * However, it is possible for a stream to transition to
4593 * a "no-chunk" state. This can happen if a rotation
4594 * occurs on an inactive session. In such cases, the metadata
4595 * regeneration will happen when the next trace chunk is
4596 * created.
4597 */
4598 ret = consumer_metadata_stream_dump(stream);
4599 if (ret) {
4600 goto error;
d73bf3d7
JD
4601 }
4602 }
4603 lttng_consumer_reset_stream_rotate_state(stream);
4604
4605 ret = 0;
4606
4607error:
4608 return ret;
4609}
4610
b99a8d42
JD
4611/*
4612 * Rotate all the ready streams now.
4613 *
4614 * This is especially important for low throughput streams that have already
4615 * been consumed, we cannot wait for their next packet to perform the
4616 * rotation.
92b7a7f8
MD
4617 * Need to be called with RCU read-side lock held to ensure existence of
4618 * channel.
b99a8d42
JD
4619 *
4620 * Returns 0 on success, < 0 on error
4621 */
28ab034a 4622int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
b99a8d42
JD
4623{
4624 int ret;
b99a8d42
JD
4625 struct lttng_consumer_stream *stream;
4626 struct lttng_ht_iter iter;
fa29bfbf 4627 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
b99a8d42 4628
48b7cdc2
FD
4629 ASSERT_RCU_READ_LOCKED();
4630
56047f5a 4631 lttng::urcu::read_lock_guard read_lock;
b99a8d42
JD
4632
4633 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4634
b99a8d42 4635 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
4636 ht->hash_fct(&channel->key, lttng_ht_seed),
4637 ht->match_fct,
4638 &channel->key,
4639 &iter.iter,
4640 stream,
4641 node_channel_id.node)
4642 {
b99a8d42
JD
4643 health_code_update();
4644
d2956687 4645 pthread_mutex_lock(&stream->chan->lock);
b99a8d42
JD
4646 pthread_mutex_lock(&stream->lock);
4647
4648 if (!stream->rotate_ready) {
4649 pthread_mutex_unlock(&stream->lock);
d2956687 4650 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4651 continue;
4652 }
4653 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4654
f46376a1 4655 ret = lttng_consumer_rotate_stream(stream);
b99a8d42 4656 pthread_mutex_unlock(&stream->lock);
d2956687 4657 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4658 if (ret) {
4659 goto end;
4660 }
4661 }
4662
4663 ret = 0;
4664
4665end:
b99a8d42
JD
4666 return ret;
4667}
4668
28ab034a
JG
4669enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4670 const lttng_uuid& sessiond_uuid)
00fb02ac 4671{
d2956687 4672 enum lttcomm_return_code ret;
c70636a7 4673 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4674
d2956687
JG
4675 if (ctx->sessiond_uuid.is_set) {
4676 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4677 goto end;
4678 }
4679
d2956687 4680 ctx->sessiond_uuid.is_set = true;
328c2fe7 4681 ctx->sessiond_uuid.value = sessiond_uuid;
d2956687
JG
4682 ret = LTTCOMM_CONSUMERD_SUCCESS;
4683 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4684 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4685end:
4686 return ret;
4687}
4688
28ab034a
JG
4689enum lttcomm_return_code
4690lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4691 uint64_t session_id,
4692 uint64_t chunk_id,
4693 time_t chunk_creation_timestamp,
4694 const char *chunk_override_name,
4695 const struct lttng_credentials *credentials,
4696 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4697{
4698 int ret;
d2956687 4699 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
cd9adb8b 4700 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
d2956687
JG
4701 enum lttng_trace_chunk_status chunk_status;
4702 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4703 char creation_timestamp_buffer[ISO8601_STR_LEN];
4704 const char *relayd_id_str = "(none)";
4705 const char *creation_timestamp_str;
4706 struct lttng_ht_iter iter;
4707 struct lttng_consumer_channel *channel;
92816cc3 4708
d2956687
JG
4709 if (relayd_id) {
4710 /* Only used for logging purposes. */
28ab034a 4711 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4712 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4713 relayd_id_str = relayd_id_buffer;
4714 } else {
4715 relayd_id_str = "(formatting error)";
4716 }
d01ef216 4717 }
d2956687 4718
d01ef216 4719 /* Local protocol error. */
a0377dfe 4720 LTTNG_ASSERT(chunk_creation_timestamp);
d2956687 4721 ret = time_to_iso8601_str(chunk_creation_timestamp,
28ab034a
JG
4722 creation_timestamp_buffer,
4723 sizeof(creation_timestamp_buffer));
4724 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
4725
4726 DBG("Consumer create trace chunk command: relay_id = %s"
28ab034a
JG
4727 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4728 ", chunk_creation_timestamp = %s",
4729 relayd_id_str,
4730 session_id,
4731 chunk_id,
4732 chunk_override_name ?: "(none)",
4733 creation_timestamp_str);
92816cc3
JG
4734
4735 /*
d2956687
JG
4736 * The trace chunk registry, as used by the consumer daemon, implicitly
4737 * owns the trace chunks. This is only needed in the consumer since
4738 * the consumer has no notion of a session beyond session IDs being
4739 * used to identify other objects.
4740 *
4741 * The lttng_trace_chunk_registry_publish() call below provides a
4742 * reference which is not released; it implicitly becomes the session
4743 * daemon's reference to the chunk in the consumer daemon.
4744 *
4745 * The lifetime of trace chunks in the consumer daemon is managed by
4746 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4747 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4748 */
cd9adb8b 4749 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
d2956687
JG
4750 if (!created_chunk) {
4751 ERR("Failed to create trace chunk");
4752 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4753 goto error;
d2956687 4754 }
92816cc3 4755
d2956687 4756 if (chunk_override_name) {
28ab034a 4757 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
d2956687
JG
4758 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4759 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4760 goto error;
92816cc3
JG
4761 }
4762 }
4763
d2956687 4764 if (chunk_directory_handle) {
28ab034a 4765 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
d2956687
JG
4766 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4767 ERR("Failed to set trace chunk credentials");
4768 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4769 goto error;
d2956687
JG
4770 }
4771 /*
4772 * The consumer daemon has no ownership of the chunk output
4773 * directory.
4774 */
28ab034a 4775 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
cd9adb8b 4776 chunk_directory_handle = nullptr;
d2956687
JG
4777 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4778 ERR("Failed to set trace chunk's directory handle");
4779 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4780 goto error;
92816cc3
JG
4781 }
4782 }
4783
d2956687 4784 published_chunk = lttng_trace_chunk_registry_publish_chunk(
28ab034a 4785 the_consumer_data.chunk_registry, session_id, created_chunk);
d2956687 4786 lttng_trace_chunk_put(created_chunk);
cd9adb8b 4787 created_chunk = nullptr;
d2956687
JG
4788 if (!published_chunk) {
4789 ERR("Failed to publish trace chunk");
4790 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4791 goto error;
d88744a4
JD
4792 }
4793
28ab034a 4794 {
56047f5a
JG
4795 lttng::urcu::read_lock_guard read_lock;
4796 cds_lfht_for_each_entry_duplicate(
4797 the_consumer_data.channels_by_session_id_ht->ht,
4798 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4799 lttng_ht_seed),
4800 the_consumer_data.channels_by_session_id_ht->match_fct,
4801 &session_id,
4802 &iter.iter,
4803 channel,
4804 channels_by_session_id_ht_node.node)
4805 {
4806 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4807 if (ret) {
4808 /*
4809 * Roll-back the creation of this chunk.
4810 *
4811 * This is important since the session daemon will
4812 * assume that the creation of this chunk failed and
4813 * will never ask for it to be closed, resulting
4814 * in a leak and an inconsistent state for some
4815 * channels.
4816 */
4817 enum lttcomm_return_code close_ret;
4818 char path[LTTNG_PATH_MAX];
4819
4820 DBG("Failed to set new trace chunk on existing channels, rolling back");
4821 close_ret =
4822 lttng_consumer_close_trace_chunk(relayd_id,
4823 session_id,
4824 chunk_id,
4825 chunk_creation_timestamp,
4826 nullptr,
4827 path);
4828 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4829 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4830 ", chunk_id = %" PRIu64,
4831 session_id,
4832 chunk_id);
4833 }
d2956687 4834
56047f5a
JG
4835 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4836 break;
d2956687 4837 }
d2956687 4838 }
a1ae2ea5
JD
4839 }
4840
e5add6d0
JG
4841 if (relayd_id) {
4842 struct consumer_relayd_sock_pair *relayd;
4843
4844 relayd = consumer_find_relayd(*relayd_id);
4845 if (relayd) {
4846 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4847 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
e5add6d0
JG
4848 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4849 } else {
4850 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4851 }
4852
4853 if (!relayd || ret) {
4854 enum lttcomm_return_code close_ret;
ecd1a12f 4855 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4856
4857 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
28ab034a
JG
4858 session_id,
4859 chunk_id,
4860 chunk_creation_timestamp,
cd9adb8b 4861 nullptr,
28ab034a 4862 path);
e5add6d0 4863 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
28ab034a
JG
4864 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4865 ", chunk_id = %" PRIu64,
4866 session_id,
4867 chunk_id);
e5add6d0
JG
4868 }
4869
4870 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4871 goto error_unlock;
e5add6d0
JG
4872 }
4873 }
7ea24db3 4874error_unlock:
7ea24db3 4875error:
d2956687
JG
4876 /* Release the reference returned by the "publish" operation. */
4877 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4878 lttng_trace_chunk_put(created_chunk);
d2956687 4879 return ret_code;
a1ae2ea5
JD
4880}
4881
28ab034a
JG
4882enum lttcomm_return_code
4883lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4884 uint64_t session_id,
4885 uint64_t chunk_id,
4886 time_t chunk_close_timestamp,
4887 const enum lttng_trace_chunk_command_type *close_command,
4888 char *path)
a1ae2ea5 4889{
d2956687
JG
4890 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4891 struct lttng_trace_chunk *chunk;
4892 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4893 const char *relayd_id_str = "(none)";
bbc4768c 4894 const char *close_command_name = "none";
d2956687
JG
4895 struct lttng_ht_iter iter;
4896 struct lttng_consumer_channel *channel;
4897 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4898
d2956687
JG
4899 if (relayd_id) {
4900 int ret;
4901
4902 /* Only used for logging purposes. */
28ab034a 4903 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
4904 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4905 relayd_id_str = relayd_id_buffer;
4906 } else {
4907 relayd_id_str = "(formatting error)";
4908 }
bbc4768c
JG
4909 }
4910 if (close_command) {
28ab034a 4911 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
bbc4768c 4912 }
d2956687
JG
4913
4914 DBG("Consumer close trace chunk command: relayd_id = %s"
28ab034a
JG
4915 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4916 relayd_id_str,
4917 session_id,
4918 chunk_id,
4919 close_command_name);
bbc4768c 4920
d2956687 4921 chunk = lttng_trace_chunk_registry_find_chunk(
28ab034a 4922 the_consumer_data.chunk_registry, session_id, chunk_id);
bbc4768c 4923 if (!chunk) {
28ab034a
JG
4924 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4925 session_id,
4926 chunk_id);
d2956687 4927 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4928 goto end;
4929 }
4930
28ab034a 4931 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
d2956687
JG
4932 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4933 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4934 goto end;
45f1d9a1 4935 }
bbc4768c
JG
4936
4937 if (close_command) {
28ab034a 4938 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
bbc4768c
JG
4939 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4940 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4941 goto end;
4942 }
4943 }
a1ae2ea5 4944
d2956687
JG
4945 /*
4946 * chunk is now invalid to access as we no longer hold a reference to
4947 * it; it is only kept around to compare it (by address) to the
4948 * current chunk found in the session's channels.
4949 */
56047f5a
JG
4950 {
4951 lttng::urcu::read_lock_guard read_lock;
4952 cds_lfht_for_each_entry (
4953 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4954 int ret;
a1ae2ea5 4955
d2956687 4956 /*
56047f5a
JG
4957 * Only change the channel's chunk to NULL if it still
4958 * references the chunk being closed. The channel may
4959 * reference a newer channel in the case of a session
4960 * rotation. When a session rotation occurs, the "next"
4961 * chunk is created before the "current" chunk is closed.
d2956687 4962 */
56047f5a
JG
4963 if (channel->trace_chunk != chunk) {
4964 continue;
4965 }
4966 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4967 if (ret) {
4968 /*
4969 * Attempt to close the chunk on as many channels as
4970 * possible.
4971 */
4972 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4973 }
d2956687 4974 }
a1ae2ea5 4975 }
bbc4768c
JG
4976 if (relayd_id) {
4977 int ret;
4978 struct consumer_relayd_sock_pair *relayd;
4979
4980 relayd = consumer_find_relayd(*relayd_id);
4981 if (relayd) {
4982 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 4983 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
bbc4768c
JG
4984 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4985 } else {
28ab034a 4986 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
bbc4768c
JG
4987 }
4988
4989 if (!relayd || ret) {
4990 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4991 goto error_unlock;
4992 }
4993 }
4994error_unlock:
d2956687 4995end:
bbc4768c
JG
4996 /*
4997 * Release the reference returned by the "find" operation and
4998 * the session daemon's implicit reference to the chunk.
4999 */
5000 lttng_trace_chunk_put(chunk);
5001 lttng_trace_chunk_put(chunk);
5002
d2956687 5003 return ret_code;
a1ae2ea5 5004}
3654ed19 5005
28ab034a
JG
5006enum lttcomm_return_code
5007lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
3654ed19 5008{
c35f9726 5009 int ret;
d2956687 5010 enum lttcomm_return_code ret_code;
d2956687
JG
5011 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5012 const char *relayd_id_str = "(none)";
c35f9726 5013 const bool is_local_trace = !relayd_id;
cd9adb8b 5014 struct consumer_relayd_sock_pair *relayd = nullptr;
6b584c2e 5015 bool chunk_exists_local, chunk_exists_remote;
56047f5a 5016 lttng::urcu::read_lock_guard read_lock;
d2956687
JG
5017
5018 if (relayd_id) {
d2956687 5019 /* Only used for logging purposes. */
28ab034a 5020 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
d2956687
JG
5021 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5022 relayd_id_str = relayd_id_buffer;
5023 } else {
5024 relayd_id_str = "(formatting error)";
5025 }
d01ef216 5026 }
d2956687
JG
5027
5028 DBG("Consumer trace chunk exists command: relayd_id = %s"
28ab034a
JG
5029 ", chunk_id = %" PRIu64,
5030 relayd_id_str,
5031 chunk_id);
6b584c2e 5032 ret = lttng_trace_chunk_registry_chunk_exists(
28ab034a 5033 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
6b584c2e
JG
5034 if (ret) {
5035 /* Internal error. */
5036 ERR("Failed to query the existence of a trace chunk");
5037 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 5038 goto end;
6b584c2e 5039 }
28ab034a 5040 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
6b584c2e 5041 if (chunk_exists_local) {
c35f9726 5042 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
5043 goto end;
5044 } else if (is_local_trace) {
5045 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5046 goto end;
5047 }
5048
c35f9726
JG
5049 relayd = consumer_find_relayd(*relayd_id);
5050 if (!relayd) {
5051 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5052 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5053 goto end_rcu_unlock;
5054 }
5055 DBG("Looking up existence of trace chunk on relay daemon");
5056 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
28ab034a 5057 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
c35f9726
JG
5058 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5059 if (ret < 0) {
5060 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5061 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5062 goto end_rcu_unlock;
5063 }
5064
28ab034a
JG
5065 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5066 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5067 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
d2956687 5068
c35f9726 5069end_rcu_unlock:
c35f9726 5070end:
d2956687 5071 return ret_code;
3654ed19 5072}
5f3aff8b 5073
28ab034a 5074static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5f3aff8b
MD
5075{
5076 struct lttng_ht *ht;
5077 struct lttng_consumer_stream *stream;
5078 struct lttng_ht_iter iter;
5079 int ret;
5080
fa29bfbf 5081 ht = the_consumer_data.stream_per_chan_id_ht;
5f3aff8b 5082
56047f5a 5083 lttng::urcu::read_lock_guard read_lock;
5f3aff8b 5084 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
5085 ht->hash_fct(&channel->key, lttng_ht_seed),
5086 ht->match_fct,
5087 &channel->key,
5088 &iter.iter,
5089 stream,
5090 node_channel_id.node)
5091 {
5f3aff8b
MD
5092 /*
5093 * Protect against teardown with mutex.
5094 */
5095 pthread_mutex_lock(&stream->lock);
5096 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5097 goto next;
5098 }
5099 ret = consumer_clear_stream(stream);
5100 if (ret) {
5101 goto error_unlock;
5102 }
5103 next:
5104 pthread_mutex_unlock(&stream->lock);
5105 }
5f3aff8b
MD
5106 return LTTCOMM_CONSUMERD_SUCCESS;
5107
5108error_unlock:
5109 pthread_mutex_unlock(&stream->lock);
5f3aff8b
MD
5110 return ret;
5111}
5112
5113int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5114{
5115 int ret;
5116
5117 DBG("Consumer clear channel %" PRIu64, channel->key);
5118
5119 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5120 /*
5121 * Nothing to do for the metadata channel/stream.
5122 * Snapshot mechanism already take care of the metadata
5123 * handling/generation, and monitored channels only need to
5124 * have their data stream cleared..
5125 */
5126 ret = LTTCOMM_CONSUMERD_SUCCESS;
5127 goto end;
5128 }
5129
5130 if (!channel->monitor) {
5131 ret = consumer_clear_unmonitored_channel(channel);
5132 } else {
5133 ret = consumer_clear_monitored_channel(channel);
5134 }
5135end:
5136 return ret;
5137}
04ed9e10 5138
28ab034a 5139enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
04ed9e10
JG
5140{
5141 struct lttng_consumer_stream *stream;
5142 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5143
5144 if (channel->metadata_stream) {
5145 ERR("Open channel packets command attempted on a metadata channel");
5146 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5147 goto end;
5148 }
5149
56047f5a
JG
5150 {
5151 lttng::urcu::read_lock_guard read_lock;
5152 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5153 enum consumer_stream_open_packet_status status;
04ed9e10 5154
56047f5a
JG
5155 pthread_mutex_lock(&stream->lock);
5156 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5157 goto next;
5158 }
04ed9e10 5159
56047f5a
JG
5160 status = consumer_stream_open_packet(stream);
5161 switch (status) {
5162 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5163 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5164 ", channel name = %s, session id = %" PRIu64,
5165 stream->key,
5166 stream->chan->name,
5167 stream->chan->session_id);
5168 stream->opened_packet_in_current_trace_chunk = true;
5169 break;
5170 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5171 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5172 ", channel name = %s, session id = %" PRIu64,
5173 stream->key,
5174 stream->chan->name,
5175 stream->chan->session_id);
5176 break;
5177 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5178 /*
5179 * Only unexpected internal errors can lead to this
5180 * failing. Report an unknown error.
5181 */
5182 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5183 ", channel id = %" PRIu64 ", channel name = %s"
5184 ", session id = %" PRIu64,
5185 stream->key,
5186 channel->key,
5187 channel->name,
5188 channel->session_id);
5189 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5190 goto error_unlock;
5191 default:
5192 abort();
5193 }
04ed9e10 5194
56047f5a
JG
5195 next:
5196 pthread_mutex_unlock(&stream->lock);
5197 }
04ed9e10 5198 }
04ed9e10 5199end_rcu_unlock:
04ed9e10
JG
5200end:
5201 return ret;
5202
5203error_unlock:
5204 pthread_mutex_unlock(&stream->lock);
5205 goto end_rcu_unlock;
5206}
881fc67f
MD
5207
5208void lttng_consumer_sigbus_handle(void *addr)
5209{
5210 lttng_ustconsumer_sigbus_handle(addr);
5211}
This page took 0.452126 seconds and 4 git commands to generate.