trace-chunk: Introduce chunk "path", relayd session "ongoing_rotation", sessiond...
[lttng-tools.git] / src / common / consumer / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
6c1c0768 20#define _LGPL_SOURCE
3bd1e081 21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
331744e3 31#include <signal.h>
3bd1e081 32
51a9e1c7 33#include <bin/lttng-consumerd/health-consumerd.h>
990570ed 34#include <common/common.h>
fb3a43a9 35#include <common/utils.h>
d2956687 36#include <common/time.h>
fb3a43a9 37#include <common/compat/poll.h>
f263b7fd 38#include <common/compat/endian.h>
309167d2 39#include <common/index/index.h>
10a8a223 40#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 41#include <common/sessiond-comm/relayd.h>
10a8a223
DG
42#include <common/sessiond-comm/sessiond-comm.h>
43#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 44#include <common/relayd/relayd.h>
10a8a223 45#include <common/ust-consumer/ust-consumer.h>
c8fea79c
JR
46#include <common/consumer/consumer-timer.h>
47#include <common/consumer/consumer.h>
48#include <common/consumer/consumer-stream.h>
49#include <common/consumer/consumer-testpoint.h>
50#include <common/align.h>
5feafd41 51#include <common/consumer/consumer-metadata-cache.h>
d2956687
JG
52#include <common/trace-chunk.h>
53#include <common/trace-chunk-registry.h>
54#include <common/string-utils/format.h>
c35f9726 55#include <common/dynamic-array.h>
3bd1e081
MD
56
57struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
58 .stream_count = 0,
59 .need_update = 1,
60 .type = LTTNG_CONSUMER_UNKNOWN,
61};
62
d8ef542d
MD
63enum consumer_channel_action {
64 CONSUMER_CHANNEL_ADD,
a0cbdd2e 65 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
66 CONSUMER_CHANNEL_QUIT,
67};
68
69struct consumer_channel_msg {
70 enum consumer_channel_action action;
a0cbdd2e
MD
71 struct lttng_consumer_channel *chan; /* add */
72 uint64_t key; /* del */
d8ef542d
MD
73};
74
80957876 75/* Flag used to temporarily pause data consumption from testpoints. */
cf0bcb51
JG
76int data_consumption_paused;
77
3bd1e081
MD
78/*
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
82 * polling threads.
83 */
10211f5c 84int consumer_quit;
3bd1e081 85
43c34bc3 86/*
43c34bc3
DG
87 * Global hash table containing respectively metadata and data streams. The
88 * stream element in this ht should only be updated by the metadata poll thread
89 * for the metadata and the data poll thread for the data.
90 */
40dc48e0
DG
91static struct lttng_ht *metadata_ht;
92static struct lttng_ht *data_ht;
43c34bc3 93
5da88b0f
MD
94static const char *get_consumer_domain(void)
95{
96 switch (consumer_data.type) {
97 case LTTNG_CONSUMER_KERNEL:
98 return DEFAULT_KERNEL_TRACE_DIR;
99 case LTTNG_CONSUMER64_UST:
100 /* Fall-through. */
101 case LTTNG_CONSUMER32_UST:
102 return DEFAULT_UST_TRACE_DIR;
103 default:
104 abort();
105 }
106}
107
acdb9057
DG
108/*
109 * Notify a thread lttng pipe to poll back again. This usually means that some
110 * global state has changed so we just send back the thread in a poll wait
111 * call.
112 */
113static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
114{
115 struct lttng_consumer_stream *null_stream = NULL;
116
117 assert(pipe);
118
119 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
120}
121
5c635c72
MD
122static void notify_health_quit_pipe(int *pipe)
123{
6cd525e8 124 ssize_t ret;
5c635c72 125
6cd525e8
MD
126 ret = lttng_write(pipe[1], "4", 1);
127 if (ret < 1) {
5c635c72
MD
128 PERROR("write consumer health quit");
129 }
130}
131
d8ef542d
MD
132static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
133 struct lttng_consumer_channel *chan,
a0cbdd2e 134 uint64_t key,
d8ef542d
MD
135 enum consumer_channel_action action)
136{
137 struct consumer_channel_msg msg;
6cd525e8 138 ssize_t ret;
d8ef542d 139
e56251fc
DG
140 memset(&msg, 0, sizeof(msg));
141
d8ef542d
MD
142 msg.action = action;
143 msg.chan = chan;
f21dae48 144 msg.key = key;
6cd525e8
MD
145 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
146 if (ret < sizeof(msg)) {
147 PERROR("notify_channel_pipe write error");
148 }
d8ef542d
MD
149}
150
a0cbdd2e
MD
151void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
152 uint64_t key)
153{
154 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
155}
156
d8ef542d
MD
157static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
158 struct lttng_consumer_channel **chan,
a0cbdd2e 159 uint64_t *key,
d8ef542d
MD
160 enum consumer_channel_action *action)
161{
162 struct consumer_channel_msg msg;
6cd525e8 163 ssize_t ret;
d8ef542d 164
6cd525e8
MD
165 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
166 if (ret < sizeof(msg)) {
167 ret = -1;
168 goto error;
d8ef542d 169 }
6cd525e8
MD
170 *action = msg.action;
171 *chan = msg.chan;
172 *key = msg.key;
173error:
174 return (int) ret;
d8ef542d
MD
175}
176
212d67a2
DG
177/*
178 * Cleanup the stream list of a channel. Those streams are not yet globally
179 * visible
180 */
181static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
182{
183 struct lttng_consumer_stream *stream, *stmp;
184
185 assert(channel);
186
187 /* Delete streams that might have been left in the stream list. */
188 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
189 send_node) {
190 cds_list_del(&stream->send_node);
191 /*
192 * Once a stream is added to this list, the buffers were created so we
193 * have a guarantee that this call will succeed. Setting the monitor
194 * mode to 0 so we don't lock nor try to delete the stream from the
195 * global hash table.
196 */
197 stream->monitor = 0;
198 consumer_stream_destroy(stream, NULL);
199 }
200}
201
3bd1e081
MD
202/*
203 * Find a stream. The consumer_data.lock must be locked during this
204 * call.
205 */
d88aee68 206static struct lttng_consumer_stream *find_stream(uint64_t key,
8389e4f8 207 struct lttng_ht *ht)
3bd1e081 208{
e4421fec 209 struct lttng_ht_iter iter;
d88aee68 210 struct lttng_ht_node_u64 *node;
e4421fec 211 struct lttng_consumer_stream *stream = NULL;
3bd1e081 212
8389e4f8
DG
213 assert(ht);
214
d88aee68
DG
215 /* -1ULL keys are lookup failures */
216 if (key == (uint64_t) -1ULL) {
7ad0a0cb 217 return NULL;
7a57cf92 218 }
e4421fec 219
6065ceec
DG
220 rcu_read_lock();
221
d88aee68
DG
222 lttng_ht_lookup(ht, &key, &iter);
223 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
224 if (node != NULL) {
225 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 226 }
e4421fec 227
6065ceec
DG
228 rcu_read_unlock();
229
e4421fec 230 return stream;
3bd1e081
MD
231}
232
da009f2c 233static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
234{
235 struct lttng_consumer_stream *stream;
236
04253271 237 rcu_read_lock();
ffe60014 238 stream = find_stream(key, ht);
04253271 239 if (stream) {
da009f2c 240 stream->key = (uint64_t) -1ULL;
04253271
MD
241 /*
242 * We don't want the lookup to match, but we still need
243 * to iterate on this stream when iterating over the hash table. Just
244 * change the node key.
245 */
da009f2c 246 stream->node.key = (uint64_t) -1ULL;
04253271
MD
247 }
248 rcu_read_unlock();
7ad0a0cb
MD
249}
250
d56db448
DG
251/*
252 * Return a channel object for the given key.
253 *
254 * RCU read side lock MUST be acquired before calling this function and
255 * protects the channel ptr.
256 */
d88aee68 257struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 258{
e4421fec 259 struct lttng_ht_iter iter;
d88aee68 260 struct lttng_ht_node_u64 *node;
e4421fec 261 struct lttng_consumer_channel *channel = NULL;
3bd1e081 262
d88aee68
DG
263 /* -1ULL keys are lookup failures */
264 if (key == (uint64_t) -1ULL) {
7ad0a0cb 265 return NULL;
7a57cf92 266 }
e4421fec 267
d88aee68
DG
268 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
269 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
270 if (node != NULL) {
271 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 272 }
e4421fec
DG
273
274 return channel;
3bd1e081
MD
275}
276
b5a6470f
DG
277/*
278 * There is a possibility that the consumer does not have enough time between
279 * the close of the channel on the session daemon and the cleanup in here thus
280 * once we have a channel add with an existing key, we know for sure that this
281 * channel will eventually get cleaned up by all streams being closed.
282 *
283 * This function just nullifies the already existing channel key.
284 */
285static void steal_channel_key(uint64_t key)
286{
287 struct lttng_consumer_channel *channel;
288
289 rcu_read_lock();
290 channel = consumer_find_channel(key);
291 if (channel) {
292 channel->key = (uint64_t) -1ULL;
293 /*
294 * We don't want the lookup to match, but we still need to iterate on
295 * this channel when iterating over the hash table. Just change the
296 * node key.
297 */
298 channel->node.key = (uint64_t) -1ULL;
299 }
300 rcu_read_unlock();
301}
302
ffe60014 303static void free_channel_rcu(struct rcu_head *head)
702b1ea4 304{
d88aee68
DG
305 struct lttng_ht_node_u64 *node =
306 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
307 struct lttng_consumer_channel *channel =
308 caa_container_of(node, struct lttng_consumer_channel, node);
702b1ea4 309
b83e03c4
MD
310 switch (consumer_data.type) {
311 case LTTNG_CONSUMER_KERNEL:
312 break;
313 case LTTNG_CONSUMER32_UST:
314 case LTTNG_CONSUMER64_UST:
315 lttng_ustconsumer_free_channel(channel);
316 break;
317 default:
318 ERR("Unknown consumer_data type");
319 abort();
320 }
ffe60014 321 free(channel);
702b1ea4
MD
322}
323
00e2e675
DG
324/*
325 * RCU protected relayd socket pair free.
326 */
ffe60014 327static void free_relayd_rcu(struct rcu_head *head)
00e2e675 328{
d88aee68
DG
329 struct lttng_ht_node_u64 *node =
330 caa_container_of(head, struct lttng_ht_node_u64, head);
00e2e675
DG
331 struct consumer_relayd_sock_pair *relayd =
332 caa_container_of(node, struct consumer_relayd_sock_pair, node);
333
8994307f
DG
334 /*
335 * Close all sockets. This is done in the call RCU since we don't want the
336 * socket fds to be reassigned thus potentially creating bad state of the
337 * relayd object.
338 *
339 * We do not have to lock the control socket mutex here since at this stage
340 * there is no one referencing to this relayd object.
341 */
342 (void) relayd_close(&relayd->control_sock);
343 (void) relayd_close(&relayd->data_sock);
344
3a84e2f3 345 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
00e2e675
DG
346 free(relayd);
347}
348
349/*
350 * Destroy and free relayd socket pair object.
00e2e675 351 */
51230d70 352void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
353{
354 int ret;
355 struct lttng_ht_iter iter;
356
173af62f
DG
357 if (relayd == NULL) {
358 return;
359 }
360
00e2e675
DG
361 DBG("Consumer destroy and close relayd socket pair");
362
363 iter.iter.node = &relayd->node.node;
364 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f 365 if (ret != 0) {
8994307f 366 /* We assume the relayd is being or is destroyed */
173af62f
DG
367 return;
368 }
00e2e675 369
00e2e675 370 /* RCU free() call */
ffe60014
DG
371 call_rcu(&relayd->node.head, free_relayd_rcu);
372}
373
374/*
375 * Remove a channel from the global list protected by a mutex. This function is
376 * also responsible for freeing its data structures.
377 */
378void consumer_del_channel(struct lttng_consumer_channel *channel)
379{
ffe60014
DG
380 struct lttng_ht_iter iter;
381
d88aee68 382 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014
DG
383
384 pthread_mutex_lock(&consumer_data.lock);
a9838785 385 pthread_mutex_lock(&channel->lock);
ffe60014 386
212d67a2
DG
387 /* Destroy streams that might have been left in the stream list. */
388 clean_channel_stream_list(channel);
51e762e5 389
d3e2ba59
JD
390 if (channel->live_timer_enabled == 1) {
391 consumer_timer_live_stop(channel);
392 }
e9404c27
JG
393 if (channel->monitor_timer_enabled == 1) {
394 consumer_timer_monitor_stop(channel);
395 }
d3e2ba59 396
ffe60014
DG
397 switch (consumer_data.type) {
398 case LTTNG_CONSUMER_KERNEL:
399 break;
400 case LTTNG_CONSUMER32_UST:
401 case LTTNG_CONSUMER64_UST:
402 lttng_ustconsumer_del_channel(channel);
403 break;
404 default:
405 ERR("Unknown consumer_data type");
406 assert(0);
407 goto end;
408 }
409
d2956687
JG
410 lttng_trace_chunk_put(channel->trace_chunk);
411 channel->trace_chunk = NULL;
5c3892a6 412
d2956687
JG
413 if (channel->is_published) {
414 int ret;
415
416 rcu_read_lock();
417 iter.iter.node = &channel->node.node;
418 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
419 assert(!ret);
ffe60014 420
d2956687
JG
421 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
422 ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
423 &iter);
424 assert(!ret);
425 rcu_read_unlock();
426 }
427
b6921a17
JG
428 channel->is_deleted = true;
429 call_rcu(&channel->node.head, free_channel_rcu);
ffe60014 430end:
a9838785 431 pthread_mutex_unlock(&channel->lock);
ffe60014 432 pthread_mutex_unlock(&consumer_data.lock);
00e2e675
DG
433}
434
228b5bf7
DG
435/*
436 * Iterate over the relayd hash table and destroy each element. Finally,
437 * destroy the whole hash table.
438 */
439static void cleanup_relayd_ht(void)
440{
441 struct lttng_ht_iter iter;
442 struct consumer_relayd_sock_pair *relayd;
443
444 rcu_read_lock();
445
446 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
447 node.node) {
51230d70 448 consumer_destroy_relayd(relayd);
228b5bf7
DG
449 }
450
228b5bf7 451 rcu_read_unlock();
36b588ed
MD
452
453 lttng_ht_destroy(consumer_data.relayd_ht);
228b5bf7
DG
454}
455
8994307f
DG
456/*
457 * Update the end point status of all streams having the given network sequence
458 * index (relayd index).
459 *
460 * It's atomically set without having the stream mutex locked which is fine
461 * because we handle the write/read race with a pipe wakeup for each thread.
462 */
da009f2c 463static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
8994307f
DG
464 enum consumer_endpoint_status status)
465{
466 struct lttng_ht_iter iter;
467 struct lttng_consumer_stream *stream;
468
da009f2c 469 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f
DG
470
471 rcu_read_lock();
472
473 /* Let's begin with metadata */
474 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
475 if (stream->net_seq_idx == net_seq_idx) {
476 uatomic_set(&stream->endpoint_status, status);
477 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
478 }
479 }
480
481 /* Follow up by the data streams */
482 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
483 if (stream->net_seq_idx == net_seq_idx) {
484 uatomic_set(&stream->endpoint_status, status);
485 DBG("Delete flag set to data stream %d", stream->wait_fd);
486 }
487 }
488 rcu_read_unlock();
489}
490
491/*
492 * Cleanup a relayd object by flagging every associated streams for deletion,
493 * destroying the object meaning removing it from the relayd hash table,
494 * closing the sockets and freeing the memory in a RCU call.
495 *
496 * If a local data context is available, notify the threads that the streams'
497 * state have changed.
498 */
9276e5c8 499void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
8994307f 500{
da009f2c 501 uint64_t netidx;
8994307f
DG
502
503 assert(relayd);
504
9276e5c8 505 DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
9617607b 506
8994307f
DG
507 /* Save the net sequence index before destroying the object */
508 netidx = relayd->net_seq_idx;
509
510 /*
511 * Delete the relayd from the relayd hash table, close the sockets and free
512 * the object in a RCU call.
513 */
51230d70 514 consumer_destroy_relayd(relayd);
8994307f
DG
515
516 /* Set inactive endpoint to all streams */
517 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
518
519 /*
520 * With a local data context, notify the threads that the streams' state
521 * have changed. The write() action on the pipe acts as an "implicit"
522 * memory barrier ordering the updates of the end point status from the
523 * read of this status which happens AFTER receiving this notify.
524 */
9276e5c8
JR
525 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
526 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
8994307f
DG
527}
528
a6ba4fe1
DG
529/*
530 * Flag a relayd socket pair for destruction. Destroy it if the refcount
531 * reaches zero.
532 *
533 * RCU read side lock MUST be aquired before calling this function.
534 */
535void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
536{
537 assert(relayd);
538
539 /* Set destroy flag for this object */
540 uatomic_set(&relayd->destroy_flag, 1);
541
542 /* Destroy the relayd if refcount is 0 */
543 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 544 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
545 }
546}
547
3bd1e081 548/*
1d1a276c
DG
549 * Completly destroy stream from every visiable data structure and the given
550 * hash table if one.
551 *
552 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 553 */
e316aad5
DG
554void consumer_del_stream(struct lttng_consumer_stream *stream,
555 struct lttng_ht *ht)
3bd1e081 556{
1d1a276c 557 consumer_stream_destroy(stream, ht);
3bd1e081
MD
558}
559
5ab66908
MD
560/*
561 * XXX naming of del vs destroy is all mixed up.
562 */
563void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
564{
565 consumer_stream_destroy(stream, data_ht);
566}
567
568void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
569{
570 consumer_stream_destroy(stream, metadata_ht);
571}
572
d9a2e16e
JD
573void consumer_stream_update_channel_attributes(
574 struct lttng_consumer_stream *stream,
575 struct lttng_consumer_channel *channel)
576{
577 stream->channel_read_only_attributes.tracefile_size =
578 channel->tracefile_size;
d9a2e16e
JD
579}
580
d88aee68
DG
581struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
582 uint64_t stream_key,
ffe60014 583 const char *channel_name,
57a269f2 584 uint64_t relayd_id,
53632229 585 uint64_t session_id,
d2956687 586 struct lttng_trace_chunk *trace_chunk,
ffe60014
DG
587 int cpu,
588 int *alloc_ret,
4891ece8 589 enum consumer_channel_type type,
d2956687 590 unsigned int monitor)
3bd1e081 591{
ffe60014 592 int ret;
3bd1e081 593 struct lttng_consumer_stream *stream;
3bd1e081 594
effcf122 595 stream = zmalloc(sizeof(*stream));
3bd1e081 596 if (stream == NULL) {
7a57cf92 597 PERROR("malloc struct lttng_consumer_stream");
ffe60014 598 ret = -ENOMEM;
7a57cf92 599 goto end;
3bd1e081 600 }
7a57cf92 601
d2956687
JG
602 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
603 ERR("Failed to acquire trace chunk reference during the creation of a stream");
604 ret = -1;
605 goto error;
606 }
d56db448 607
d2956687 608 rcu_read_lock();
3bd1e081 609 stream->key = stream_key;
d2956687 610 stream->trace_chunk = trace_chunk;
3bd1e081
MD
611 stream->out_fd = -1;
612 stream->out_fd_offset = 0;
e5d1a9b3 613 stream->output_written = 0;
ffe60014 614 stream->net_seq_idx = relayd_id;
53632229 615 stream->session_id = session_id;
4891ece8 616 stream->monitor = monitor;
774d490c 617 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
f8f3885c 618 stream->index_file = NULL;
fb83fe64 619 stream->last_sequence_number = -1ULL;
a40a503f 620 stream->rotate_position = -1ULL;
53632229 621 pthread_mutex_init(&stream->lock, NULL);
c585821b 622 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
58b1f425 623
ffe60014
DG
624 /* If channel is the metadata, flag this stream as metadata. */
625 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
626 stream->metadata_flag = 1;
627 /* Metadata is flat out. */
628 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
94d49140
JD
629 /* Live rendez-vous point. */
630 pthread_cond_init(&stream->metadata_rdv, NULL);
631 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
58b1f425 632 } else {
ffe60014
DG
633 /* Format stream name to <channel_name>_<cpu_number> */
634 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
635 channel_name, cpu);
636 if (ret < 0) {
637 PERROR("snprintf stream name");
638 goto error;
639 }
58b1f425 640 }
c30aaa51 641
ffe60014 642 /* Key is always the wait_fd for streams. */
d88aee68 643 lttng_ht_node_init_u64(&stream->node, stream->key);
ffe60014 644
d8ef542d
MD
645 /* Init node per channel id key */
646 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
647
53632229 648 /* Init session id node with the stream session id */
d88aee68 649 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
53632229 650
07b86b52
JD
651 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
652 " relayd_id %" PRIu64 ", session_id %" PRIu64,
653 stream->name, stream->key, channel_key,
654 stream->net_seq_idx, stream->session_id);
d56db448
DG
655
656 rcu_read_unlock();
3bd1e081 657 return stream;
c80048c6
MD
658
659error:
d56db448 660 rcu_read_unlock();
d2956687 661 lttng_trace_chunk_put(stream->trace_chunk);
c80048c6 662 free(stream);
7a57cf92 663end:
ffe60014
DG
664 if (alloc_ret) {
665 *alloc_ret = ret;
666 }
c80048c6 667 return NULL;
3bd1e081
MD
668}
669
670/*
671 * Add a stream to the global list protected by a mutex.
672 */
66d583dc 673void consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 674{
5ab66908 675 struct lttng_ht *ht = data_ht;
3bd1e081 676
e316aad5 677 assert(stream);
43c34bc3 678 assert(ht);
c77fc10a 679
d88aee68 680 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5
DG
681
682 pthread_mutex_lock(&consumer_data.lock);
a9838785 683 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 684 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 685 pthread_mutex_lock(&stream->lock);
b0b335c8 686 rcu_read_lock();
e316aad5 687
43c34bc3 688 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 689 steal_stream_key(stream->key, ht);
43c34bc3 690
d88aee68 691 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 692
d8ef542d
MD
693 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
694 &stream->node_channel_id);
695
ca22feea
DG
696 /*
697 * Add stream to the stream_list_ht of the consumer data. No need to steal
698 * the key since the HT does not use it and we allow to add redundant keys
699 * into this table.
700 */
d88aee68 701 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 702
e316aad5 703 /*
ffe60014
DG
704 * When nb_init_stream_left reaches 0, we don't need to trigger any action
705 * in terms of destroying the associated channel, because the action that
e316aad5
DG
706 * causes the count to become 0 also causes a stream to be added. The
707 * channel deletion will thus be triggered by the following removal of this
708 * stream.
709 */
ffe60014 710 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
711 /* Increment refcount before decrementing nb_init_stream_left */
712 cmm_smp_wmb();
ffe60014 713 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
714 }
715
716 /* Update consumer data once the node is inserted. */
3bd1e081
MD
717 consumer_data.stream_count++;
718 consumer_data.need_update = 1;
719
e316aad5 720 rcu_read_unlock();
2e818a6a 721 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 722 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 723 pthread_mutex_unlock(&stream->chan->lock);
3bd1e081 724 pthread_mutex_unlock(&consumer_data.lock);
3bd1e081
MD
725}
726
00e2e675 727/*
3f8e211f
DG
728 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
729 * be acquired before calling this.
00e2e675 730 */
d09e1200 731static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
732{
733 int ret = 0;
d88aee68 734 struct lttng_ht_node_u64 *node;
00e2e675
DG
735 struct lttng_ht_iter iter;
736
ffe60014 737 assert(relayd);
00e2e675 738
00e2e675 739 lttng_ht_lookup(consumer_data.relayd_ht,
d88aee68
DG
740 &relayd->net_seq_idx, &iter);
741 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675 742 if (node != NULL) {
00e2e675
DG
743 goto end;
744 }
d88aee68 745 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
00e2e675 746
00e2e675
DG
747end:
748 return ret;
749}
750
751/*
752 * Allocate and return a consumer relayd socket.
753 */
027a694f 754static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
da009f2c 755 uint64_t net_seq_idx)
00e2e675
DG
756{
757 struct consumer_relayd_sock_pair *obj = NULL;
758
da009f2c
MD
759 /* net sequence index of -1 is a failure */
760 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
761 goto error;
762 }
763
764 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
765 if (obj == NULL) {
766 PERROR("zmalloc relayd sock");
767 goto error;
768 }
769
770 obj->net_seq_idx = net_seq_idx;
771 obj->refcount = 0;
173af62f 772 obj->destroy_flag = 0;
f96e4545
MD
773 obj->control_sock.sock.fd = -1;
774 obj->data_sock.sock.fd = -1;
d88aee68 775 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
00e2e675
DG
776 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
777
778error:
779 return obj;
780}
781
782/*
783 * Find a relayd socket pair in the global consumer data.
784 *
785 * Return the object if found else NULL.
b0b335c8
MD
786 * RCU read-side lock must be held across this call and while using the
787 * returned object.
00e2e675 788 */
d88aee68 789struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
790{
791 struct lttng_ht_iter iter;
d88aee68 792 struct lttng_ht_node_u64 *node;
00e2e675
DG
793 struct consumer_relayd_sock_pair *relayd = NULL;
794
795 /* Negative keys are lookup failures */
d88aee68 796 if (key == (uint64_t) -1ULL) {
00e2e675
DG
797 goto error;
798 }
799
d88aee68 800 lttng_ht_lookup(consumer_data.relayd_ht, &key,
00e2e675 801 &iter);
d88aee68 802 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675
DG
803 if (node != NULL) {
804 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
805 }
806
00e2e675
DG
807error:
808 return relayd;
809}
810
10a50311
JD
811/*
812 * Find a relayd and send the stream
813 *
814 * Returns 0 on success, < 0 on error
815 */
816int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
817 char *path)
818{
819 int ret = 0;
820 struct consumer_relayd_sock_pair *relayd;
821
822 assert(stream);
823 assert(stream->net_seq_idx != -1ULL);
824 assert(path);
825
826 /* The stream is not metadata. Get relayd reference if exists. */
827 rcu_read_lock();
828 relayd = consumer_find_relayd(stream->net_seq_idx);
829 if (relayd != NULL) {
830 /* Add stream on the relayd */
831 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
832 ret = relayd_add_stream(&relayd->control_sock, stream->name,
5da88b0f 833 get_consumer_domain(), path, &stream->relayd_stream_id,
d2956687
JG
834 stream->chan->tracefile_size,
835 stream->chan->tracefile_count,
836 stream->trace_chunk);
10a50311
JD
837 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
838 if (ret < 0) {
9276e5c8
JR
839 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
840 lttng_consumer_cleanup_relayd(relayd);
10a50311
JD
841 goto end;
842 }
1c20f0e2 843
10a50311 844 uatomic_inc(&relayd->refcount);
d01178b6 845 stream->sent_to_relayd = 1;
10a50311
JD
846 } else {
847 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
848 stream->key, stream->net_seq_idx);
849 ret = -1;
850 goto end;
851 }
852
853 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
854 stream->name, stream->key, stream->net_seq_idx);
855
856end:
857 rcu_read_unlock();
858 return ret;
859}
860
a4baae1b
JD
861/*
862 * Find a relayd and send the streams sent message
863 *
864 * Returns 0 on success, < 0 on error
865 */
866int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
867{
868 int ret = 0;
869 struct consumer_relayd_sock_pair *relayd;
870
871 assert(net_seq_idx != -1ULL);
872
873 /* The stream is not metadata. Get relayd reference if exists. */
874 rcu_read_lock();
875 relayd = consumer_find_relayd(net_seq_idx);
876 if (relayd != NULL) {
877 /* Add stream on the relayd */
878 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
879 ret = relayd_streams_sent(&relayd->control_sock);
880 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
881 if (ret < 0) {
9276e5c8
JR
882 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
883 lttng_consumer_cleanup_relayd(relayd);
a4baae1b
JD
884 goto end;
885 }
886 } else {
887 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
888 net_seq_idx);
889 ret = -1;
890 goto end;
891 }
892
893 ret = 0;
894 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
895
896end:
897 rcu_read_unlock();
898 return ret;
899}
900
10a50311
JD
901/*
902 * Find a relayd and close the stream
903 */
904void close_relayd_stream(struct lttng_consumer_stream *stream)
905{
906 struct consumer_relayd_sock_pair *relayd;
907
908 /* The stream is not metadata. Get relayd reference if exists. */
909 rcu_read_lock();
910 relayd = consumer_find_relayd(stream->net_seq_idx);
911 if (relayd) {
912 consumer_stream_relayd_close(stream, relayd);
913 }
914 rcu_read_unlock();
915}
916
00e2e675
DG
917/*
918 * Handle stream for relayd transmission if the stream applies for network
919 * streaming where the net sequence index is set.
920 *
921 * Return destination file descriptor or negative value on error.
922 */
6197aea7 923static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
1d4dfdef
DG
924 size_t data_size, unsigned long padding,
925 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
926{
927 int outfd = -1, ret;
00e2e675
DG
928 struct lttcomm_relayd_data_hdr data_hdr;
929
930 /* Safety net */
931 assert(stream);
6197aea7 932 assert(relayd);
00e2e675
DG
933
934 /* Reset data header */
935 memset(&data_hdr, 0, sizeof(data_hdr));
936
00e2e675
DG
937 if (stream->metadata_flag) {
938 /* Caller MUST acquire the relayd control socket lock */
939 ret = relayd_send_metadata(&relayd->control_sock, data_size);
940 if (ret < 0) {
941 goto error;
942 }
943
944 /* Metadata are always sent on the control socket. */
6151a90f 945 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
946 } else {
947 /* Set header with stream information */
948 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
949 data_hdr.data_size = htobe32(data_size);
1d4dfdef 950 data_hdr.padding_size = htobe32(padding);
c35f9726 951
39df6d9f
DG
952 /*
953 * Note that net_seq_num below is assigned with the *current* value of
954 * next_net_seq_num and only after that the next_net_seq_num will be
955 * increment. This is why when issuing a command on the relayd using
956 * this next value, 1 should always be substracted in order to compare
957 * the last seen sequence number on the relayd side to the last sent.
958 */
3604f373 959 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
960 /* Other fields are zeroed previously */
961
962 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
963 sizeof(data_hdr));
964 if (ret < 0) {
965 goto error;
966 }
967
3604f373
DG
968 ++stream->next_net_seq_num;
969
00e2e675 970 /* Set to go on data socket */
6151a90f 971 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
972 }
973
974error:
975 return outfd;
976}
977
d2956687
JG
978/*
979 * Trigger a dump of the metadata content. Following/during the succesful
980 * completion of this call, the metadata poll thread will start receiving
981 * metadata packets to consume.
982 *
983 * The caller must hold the channel and stream locks.
984 */
985static
986int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
987{
988 int ret;
989
990 ASSERT_LOCKED(stream->chan->lock);
991 ASSERT_LOCKED(stream->lock);
992 assert(stream->metadata_flag);
993 assert(stream->chan->trace_chunk);
994
995 switch (consumer_data.type) {
996 case LTTNG_CONSUMER_KERNEL:
997 /*
998 * Reset the position of what has been read from the
999 * metadata cache to 0 so we can dump it again.
1000 */
1001 ret = kernctl_metadata_cache_dump(stream->wait_fd);
1002 break;
1003 case LTTNG_CONSUMER32_UST:
1004 case LTTNG_CONSUMER64_UST:
1005 /*
1006 * Reset the position pushed from the metadata cache so it
1007 * will write from the beginning on the next push.
1008 */
1009 stream->ust_metadata_pushed = 0;
1010 ret = consumer_metadata_wakeup_pipe(stream->chan);
1011 break;
1012 default:
1013 ERR("Unknown consumer_data type");
1014 abort();
1015 }
1016 if (ret < 0) {
1017 ERR("Failed to dump the metadata cache");
1018 }
1019 return ret;
1020}
1021
1022static
1023int lttng_consumer_channel_set_trace_chunk(
1024 struct lttng_consumer_channel *channel,
1025 struct lttng_trace_chunk *new_trace_chunk)
1026{
d2956687 1027 pthread_mutex_lock(&channel->lock);
b6921a17
JG
1028 if (channel->is_deleted) {
1029 /*
1030 * The channel has been logically deleted and should no longer
1031 * be used. It has released its reference to its current trace
1032 * chunk and should not acquire a new one.
1033 *
1034 * Return success as there is nothing for the caller to do.
1035 */
1036 goto end;
1037 }
d2956687
JG
1038
1039 /*
1040 * The acquisition of the reference cannot fail (barring
1041 * a severe internal error) since a reference to the published
1042 * chunk is already held by the caller.
1043 */
1044 if (new_trace_chunk) {
1045 const bool acquired_reference = lttng_trace_chunk_get(
1046 new_trace_chunk);
1047
1048 assert(acquired_reference);
1049 }
1050
1051 lttng_trace_chunk_put(channel->trace_chunk);
1052 channel->trace_chunk = new_trace_chunk;
d2956687
JG
1053end:
1054 pthread_mutex_unlock(&channel->lock);
ce1aa6fe 1055 return 0;
d2956687
JG
1056}
1057
3bd1e081 1058/*
ffe60014
DG
1059 * Allocate and return a new lttng_consumer_channel object using the given key
1060 * to initialize the hash table node.
1061 *
1062 * On error, return NULL.
3bd1e081 1063 */
886224ff 1064struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
ffe60014 1065 uint64_t session_id,
d2956687 1066 const uint64_t *chunk_id,
ffe60014
DG
1067 const char *pathname,
1068 const char *name,
57a269f2 1069 uint64_t relayd_id,
1624d5b7
JD
1070 enum lttng_event_output output,
1071 uint64_t tracefile_size,
2bba9e53 1072 uint64_t tracefile_count,
1950109e 1073 uint64_t session_id_per_pid,
ecc48a90 1074 unsigned int monitor,
d7ba1388 1075 unsigned int live_timer_interval,
3d071855 1076 const char *root_shm_path,
d7ba1388 1077 const char *shm_path)
3bd1e081 1078{
d2956687
JG
1079 struct lttng_consumer_channel *channel = NULL;
1080 struct lttng_trace_chunk *trace_chunk = NULL;
1081
1082 if (chunk_id) {
1083 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1084 consumer_data.chunk_registry, session_id,
1085 *chunk_id);
1086 if (!trace_chunk) {
1087 ERR("Failed to find trace chunk reference during creation of channel");
1088 goto end;
1089 }
1090 }
3bd1e081 1091
276b26d1 1092 channel = zmalloc(sizeof(*channel));
3bd1e081 1093 if (channel == NULL) {
7a57cf92 1094 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
1095 goto end;
1096 }
ffe60014
DG
1097
1098 channel->key = key;
3bd1e081 1099 channel->refcount = 0;
ffe60014 1100 channel->session_id = session_id;
1950109e 1101 channel->session_id_per_pid = session_id_per_pid;
ffe60014 1102 channel->relayd_id = relayd_id;
1624d5b7
JD
1103 channel->tracefile_size = tracefile_size;
1104 channel->tracefile_count = tracefile_count;
2bba9e53 1105 channel->monitor = monitor;
ecc48a90 1106 channel->live_timer_interval = live_timer_interval;
a9838785 1107 pthread_mutex_init(&channel->lock, NULL);
ec6ea7d0 1108 pthread_mutex_init(&channel->timer_lock, NULL);
ffe60014 1109
0c759fc9
DG
1110 switch (output) {
1111 case LTTNG_EVENT_SPLICE:
1112 channel->output = CONSUMER_CHANNEL_SPLICE;
1113 break;
1114 case LTTNG_EVENT_MMAP:
1115 channel->output = CONSUMER_CHANNEL_MMAP;
1116 break;
1117 default:
1118 assert(0);
1119 free(channel);
1120 channel = NULL;
1121 goto end;
1122 }
1123
07b86b52
JD
1124 /*
1125 * In monitor mode, the streams associated with the channel will be put in
1126 * a special list ONLY owned by this channel. So, the refcount is set to 1
1127 * here meaning that the channel itself has streams that are referenced.
1128 *
1129 * On a channel deletion, once the channel is no longer visible, the
1130 * refcount is decremented and checked for a zero value to delete it. With
1131 * streams in no monitor mode, it will now be safe to destroy the channel.
1132 */
1133 if (!channel->monitor) {
1134 channel->refcount = 1;
1135 }
1136
ffe60014
DG
1137 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1138 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1139
1140 strncpy(channel->name, name, sizeof(channel->name));
1141 channel->name[sizeof(channel->name) - 1] = '\0';
1142
3d071855
MD
1143 if (root_shm_path) {
1144 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1145 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1146 }
d7ba1388
MD
1147 if (shm_path) {
1148 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1149 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1150 }
1151
d88aee68 1152 lttng_ht_node_init_u64(&channel->node, channel->key);
5c3892a6
JG
1153 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
1154 channel->session_id);
d8ef542d
MD
1155
1156 channel->wait_fd = -1;
ffe60014
DG
1157 CDS_INIT_LIST_HEAD(&channel->streams.head);
1158
d2956687
JG
1159 if (trace_chunk) {
1160 int ret = lttng_consumer_channel_set_trace_chunk(channel,
1161 trace_chunk);
1162 if (ret) {
1163 goto error;
1164 }
1165 }
1166
62a7b8ed 1167 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
3bd1e081 1168
3bd1e081 1169end:
d2956687 1170 lttng_trace_chunk_put(trace_chunk);
3bd1e081 1171 return channel;
d2956687
JG
1172error:
1173 consumer_del_channel(channel);
1174 channel = NULL;
1175 goto end;
3bd1e081
MD
1176}
1177
1178/*
1179 * Add a channel to the global list protected by a mutex.
821fffb2 1180 *
b5a6470f 1181 * Always return 0 indicating success.
3bd1e081 1182 */
d8ef542d
MD
1183int consumer_add_channel(struct lttng_consumer_channel *channel,
1184 struct lttng_consumer_local_data *ctx)
3bd1e081 1185{
3bd1e081 1186 pthread_mutex_lock(&consumer_data.lock);
a9838785 1187 pthread_mutex_lock(&channel->lock);
ec6ea7d0 1188 pthread_mutex_lock(&channel->timer_lock);
c77fc10a 1189
b5a6470f
DG
1190 /*
1191 * This gives us a guarantee that the channel we are about to add to the
1192 * channel hash table will be unique. See this function comment on the why
1193 * we need to steel the channel key at this stage.
1194 */
1195 steal_channel_key(channel->key);
c77fc10a 1196
b5a6470f 1197 rcu_read_lock();
d88aee68 1198 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
5c3892a6
JG
1199 lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
1200 &channel->channels_by_session_id_ht_node);
6065ceec 1201 rcu_read_unlock();
d2956687 1202 channel->is_published = true;
b5a6470f 1203
ec6ea7d0 1204 pthread_mutex_unlock(&channel->timer_lock);
a9838785 1205 pthread_mutex_unlock(&channel->lock);
3bd1e081 1206 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 1207
b5a6470f 1208 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 1209 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 1210 }
b5a6470f
DG
1211
1212 return 0;
3bd1e081
MD
1213}
1214
1215/*
1216 * Allocate the pollfd structure and the local view of the out fds to avoid
1217 * doing a lookup in the linked list and concurrency issues when writing is
1218 * needed. Called with consumer_data.lock held.
1219 *
1220 * Returns the number of fds in the structures.
1221 */
ffe60014
DG
1222static int update_poll_array(struct lttng_consumer_local_data *ctx,
1223 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
9a2fcf78 1224 struct lttng_ht *ht, int *nb_inactive_fd)
3bd1e081 1225{
3bd1e081 1226 int i = 0;
e4421fec
DG
1227 struct lttng_ht_iter iter;
1228 struct lttng_consumer_stream *stream;
3bd1e081 1229
ffe60014
DG
1230 assert(ctx);
1231 assert(ht);
1232 assert(pollfd);
1233 assert(local_stream);
1234
3bd1e081 1235 DBG("Updating poll fd array");
9a2fcf78 1236 *nb_inactive_fd = 0;
481d6c57 1237 rcu_read_lock();
43c34bc3 1238 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
1239 /*
1240 * Only active streams with an active end point can be added to the
1241 * poll set and local stream storage of the thread.
1242 *
1243 * There is a potential race here for endpoint_status to be updated
1244 * just after the check. However, this is OK since the stream(s) will
1245 * be deleted once the thread is notified that the end point state has
1246 * changed where this function will be called back again.
9a2fcf78
JD
1247 *
1248 * We track the number of inactive FDs because they still need to be
1249 * closed by the polling thread after a wakeup on the data_pipe or
1250 * metadata_pipe.
8994307f 1251 */
d2956687 1252 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
9a2fcf78 1253 (*nb_inactive_fd)++;
3bd1e081
MD
1254 continue;
1255 }
7972aab2
DG
1256 /*
1257 * This clobbers way too much the debug output. Uncomment that if you
1258 * need it for debugging purposes.
7972aab2 1259 */
e4421fec 1260 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 1261 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 1262 local_stream[i] = stream;
3bd1e081
MD
1263 i++;
1264 }
481d6c57 1265 rcu_read_unlock();
3bd1e081
MD
1266
1267 /*
50f8ae69 1268 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1269 * increment i so nb_fd is the number of real FD.
1270 */
acdb9057 1271 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1272 (*pollfd)[i].events = POLLIN | POLLPRI;
02b3d176
DG
1273
1274 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1275 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
3bd1e081
MD
1276 return i;
1277}
1278
1279/*
84382d49
MD
1280 * Poll on the should_quit pipe and the command socket return -1 on
1281 * error, 1 if should exit, 0 if data is available on the command socket
3bd1e081
MD
1282 */
1283int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1284{
1285 int num_rdy;
1286
88f2b785 1287restart:
3bd1e081
MD
1288 num_rdy = poll(consumer_sockpoll, 2, -1);
1289 if (num_rdy == -1) {
88f2b785
MD
1290 /*
1291 * Restart interrupted system call.
1292 */
1293 if (errno == EINTR) {
1294 goto restart;
1295 }
7a57cf92 1296 PERROR("Poll error");
84382d49 1297 return -1;
3bd1e081 1298 }
509bb1cf 1299 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081 1300 DBG("consumer_should_quit wake up");
84382d49 1301 return 1;
3bd1e081
MD
1302 }
1303 return 0;
3bd1e081
MD
1304}
1305
1306/*
1307 * Set the error socket.
1308 */
ffe60014
DG
1309void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1310 int sock)
3bd1e081
MD
1311{
1312 ctx->consumer_error_socket = sock;
1313}
1314
1315/*
1316 * Set the command socket path.
1317 */
3bd1e081
MD
1318void lttng_consumer_set_command_sock_path(
1319 struct lttng_consumer_local_data *ctx, char *sock)
1320{
1321 ctx->consumer_command_sock_path = sock;
1322}
1323
1324/*
1325 * Send return code to the session daemon.
1326 * If the socket is not defined, we return 0, it is not a fatal error
1327 */
ffe60014 1328int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1329{
1330 if (ctx->consumer_error_socket > 0) {
1331 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1332 sizeof(enum lttcomm_sessiond_command));
1333 }
1334
1335 return 0;
1336}
1337
1338/*
228b5bf7
DG
1339 * Close all the tracefiles and stream fds and MUST be called when all
1340 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081
MD
1341 */
1342void lttng_consumer_cleanup(void)
1343{
e4421fec 1344 struct lttng_ht_iter iter;
ffe60014 1345 struct lttng_consumer_channel *channel;
e10aec8f 1346 unsigned int trace_chunks_left;
6065ceec
DG
1347
1348 rcu_read_lock();
3bd1e081 1349
ffe60014
DG
1350 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1351 node.node) {
702b1ea4 1352 consumer_del_channel(channel);
3bd1e081 1353 }
6065ceec
DG
1354
1355 rcu_read_unlock();
d6ce1df2 1356
d6ce1df2 1357 lttng_ht_destroy(consumer_data.channel_ht);
5c3892a6 1358 lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
228b5bf7
DG
1359
1360 cleanup_relayd_ht();
1361
d8ef542d
MD
1362 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1363
228b5bf7
DG
1364 /*
1365 * This HT contains streams that are freed by either the metadata thread or
1366 * the data thread so we do *nothing* on the hash table and simply destroy
1367 * it.
1368 */
1369 lttng_ht_destroy(consumer_data.stream_list_ht);
28cc88f3 1370
e10aec8f
MD
1371 /*
1372 * Trace chunks in the registry may still exist if the session
1373 * daemon has encountered an internal error and could not
1374 * tear down its sessions and/or trace chunks properly.
1375 *
1376 * Release the session daemon's implicit reference to any remaining
1377 * trace chunk and print an error if any trace chunk was found. Note
1378 * that there are _no_ legitimate cases for trace chunks to be left,
1379 * it is a leak. However, it can happen following a crash of the
1380 * session daemon and not emptying the registry would cause an assertion
1381 * to hit.
1382 */
1383 trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
1384 consumer_data.chunk_registry);
1385 if (trace_chunks_left) {
1386 ERR("%u trace chunks are leaked by lttng-consumerd. "
1387 "This can be caused by an internal error of the session daemon.",
1388 trace_chunks_left);
1389 }
1390 /* Run all callbacks freeing each chunk. */
1391 rcu_barrier();
28cc88f3 1392 lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
3bd1e081
MD
1393}
1394
1395/*
1396 * Called from signal handler.
1397 */
1398void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1399{
6cd525e8
MD
1400 ssize_t ret;
1401
10211f5c 1402 CMM_STORE_SHARED(consumer_quit, 1);
6cd525e8
MD
1403 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1404 if (ret < 1) {
7a57cf92 1405 PERROR("write consumer quit");
3bd1e081 1406 }
ab1027f4
DG
1407
1408 DBG("Consumer flag that it should quit");
3bd1e081
MD
1409}
1410
5199ffc4
JG
1411
1412/*
1413 * Flush pending writes to trace output disk file.
1414 */
1415static
00e2e675
DG
1416void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1417 off_t orig_offset)
3bd1e081 1418{
c7a78aab 1419 int ret;
3bd1e081
MD
1420 int outfd = stream->out_fd;
1421
1422 /*
1423 * This does a blocking write-and-wait on any page that belongs to the
1424 * subbuffer prior to the one we just wrote.
1425 * Don't care about error values, as these are just hints and ways to
1426 * limit the amount of page cache used.
1427 */
ffe60014 1428 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1429 return;
1430 }
ffe60014
DG
1431 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1432 stream->max_sb_size,
3bd1e081
MD
1433 SYNC_FILE_RANGE_WAIT_BEFORE
1434 | SYNC_FILE_RANGE_WRITE
1435 | SYNC_FILE_RANGE_WAIT_AFTER);
1436 /*
1437 * Give hints to the kernel about how we access the file:
1438 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1439 * we write it.
1440 *
1441 * We need to call fadvise again after the file grows because the
1442 * kernel does not seem to apply fadvise to non-existing parts of the
1443 * file.
1444 *
1445 * Call fadvise _after_ having waited for the page writeback to
1446 * complete because the dirty page writeback semantic is not well
1447 * defined. So it can be expected to lead to lower throughput in
1448 * streaming.
1449 */
c7a78aab 1450 ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
ffe60014 1451 stream->max_sb_size, POSIX_FADV_DONTNEED);
a0d0e127 1452 if (ret && ret != -ENOSYS) {
a74a5f4a
JG
1453 errno = ret;
1454 PERROR("posix_fadvise on fd %i", outfd);
c7a78aab 1455 }
3bd1e081
MD
1456}
1457
1458/*
1459 * Initialise the necessary environnement :
1460 * - create a new context
1461 * - create the poll_pipe
1462 * - create the should_quit pipe (for signal handler)
1463 * - create the thread pipe (for splice)
1464 *
1465 * Takes a function pointer as argument, this function is called when data is
1466 * available on a buffer. This function is responsible to do the
1467 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1468 * buffer configuration and then kernctl_put_next_subbuf at the end.
1469 *
1470 * Returns a pointer to the new context or NULL on error.
1471 */
1472struct lttng_consumer_local_data *lttng_consumer_create(
1473 enum lttng_consumer_type type,
4078b776 1474 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 1475 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
1476 int (*recv_channel)(struct lttng_consumer_channel *channel),
1477 int (*recv_stream)(struct lttng_consumer_stream *stream),
30319bcb 1478 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1479{
d8ef542d 1480 int ret;
3bd1e081
MD
1481 struct lttng_consumer_local_data *ctx;
1482
1483 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1484 consumer_data.type == type);
1485 consumer_data.type = type;
1486
effcf122 1487 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081 1488 if (ctx == NULL) {
7a57cf92 1489 PERROR("allocating context");
3bd1e081
MD
1490 goto error;
1491 }
1492
1493 ctx->consumer_error_socket = -1;
331744e3 1494 ctx->consumer_metadata_socket = -1;
75d83e50 1495 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
3bd1e081
MD
1496 /* assign the callbacks */
1497 ctx->on_buffer_ready = buffer_ready;
1498 ctx->on_recv_channel = recv_channel;
1499 ctx->on_recv_stream = recv_stream;
1500 ctx->on_update_stream = update_stream;
1501
acdb9057
DG
1502 ctx->consumer_data_pipe = lttng_pipe_open(0);
1503 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1504 goto error_poll_pipe;
1505 }
1506
02b3d176
DG
1507 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1508 if (!ctx->consumer_wakeup_pipe) {
1509 goto error_wakeup_pipe;
1510 }
1511
3bd1e081
MD
1512 ret = pipe(ctx->consumer_should_quit);
1513 if (ret < 0) {
7a57cf92 1514 PERROR("Error creating recv pipe");
3bd1e081
MD
1515 goto error_quit_pipe;
1516 }
1517
d8ef542d
MD
1518 ret = pipe(ctx->consumer_channel_pipe);
1519 if (ret < 0) {
1520 PERROR("Error creating channel pipe");
1521 goto error_channel_pipe;
1522 }
1523
13886d2d
DG
1524 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1525 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1526 goto error_metadata_pipe;
1527 }
3bd1e081 1528
e9404c27
JG
1529 ctx->channel_monitor_pipe = -1;
1530
fb3a43a9 1531 return ctx;
3bd1e081 1532
fb3a43a9 1533error_metadata_pipe:
d8ef542d
MD
1534 utils_close_pipe(ctx->consumer_channel_pipe);
1535error_channel_pipe:
d8ef542d 1536 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1537error_quit_pipe:
02b3d176
DG
1538 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1539error_wakeup_pipe:
acdb9057 1540 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1541error_poll_pipe:
1542 free(ctx);
1543error:
1544 return NULL;
1545}
1546
282dadbc
MD
1547/*
1548 * Iterate over all streams of the hashtable and free them properly.
1549 */
1550static void destroy_data_stream_ht(struct lttng_ht *ht)
1551{
1552 struct lttng_ht_iter iter;
1553 struct lttng_consumer_stream *stream;
1554
1555 if (ht == NULL) {
1556 return;
1557 }
1558
1559 rcu_read_lock();
1560 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1561 /*
1562 * Ignore return value since we are currently cleaning up so any error
1563 * can't be handled.
1564 */
1565 (void) consumer_del_stream(stream, ht);
1566 }
1567 rcu_read_unlock();
1568
1569 lttng_ht_destroy(ht);
1570}
1571
1572/*
1573 * Iterate over all streams of the metadata hashtable and free them
1574 * properly.
1575 */
1576static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1577{
1578 struct lttng_ht_iter iter;
1579 struct lttng_consumer_stream *stream;
1580
1581 if (ht == NULL) {
1582 return;
1583 }
1584
1585 rcu_read_lock();
1586 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1587 /*
1588 * Ignore return value since we are currently cleaning up so any error
1589 * can't be handled.
1590 */
1591 (void) consumer_del_metadata_stream(stream, ht);
1592 }
1593 rcu_read_unlock();
1594
1595 lttng_ht_destroy(ht);
1596}
1597
3bd1e081
MD
1598/*
1599 * Close all fds associated with the instance and free the context.
1600 */
1601void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1602{
4c462e79
MD
1603 int ret;
1604
ab1027f4
DG
1605 DBG("Consumer destroying it. Closing everything.");
1606
4f2e75b9
DG
1607 if (!ctx) {
1608 return;
1609 }
1610
282dadbc
MD
1611 destroy_data_stream_ht(data_ht);
1612 destroy_metadata_stream_ht(metadata_ht);
1613
4c462e79
MD
1614 ret = close(ctx->consumer_error_socket);
1615 if (ret) {
1616 PERROR("close");
1617 }
331744e3
JD
1618 ret = close(ctx->consumer_metadata_socket);
1619 if (ret) {
1620 PERROR("close");
1621 }
d8ef542d 1622 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1623 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1624 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
02b3d176 1625 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
d8ef542d 1626 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9 1627
3bd1e081
MD
1628 unlink(ctx->consumer_command_sock_path);
1629 free(ctx);
1630}
1631
6197aea7
DG
1632/*
1633 * Write the metadata stream id on the specified file descriptor.
1634 */
1635static int write_relayd_metadata_id(int fd,
1636 struct lttng_consumer_stream *stream,
239f61af 1637 unsigned long padding)
6197aea7 1638{
6cd525e8 1639 ssize_t ret;
1d4dfdef 1640 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1641
1d4dfdef
DG
1642 hdr.stream_id = htobe64(stream->relayd_stream_id);
1643 hdr.padding_size = htobe32(padding);
6cd525e8
MD
1644 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1645 if (ret < sizeof(hdr)) {
d7b75ec8 1646 /*
6f04ed72 1647 * This error means that the fd's end is closed so ignore the PERROR
d7b75ec8
DG
1648 * not to clubber the error output since this can happen in a normal
1649 * code path.
1650 */
1651 if (errno != EPIPE) {
1652 PERROR("write metadata stream id");
1653 }
1654 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1655 /*
1656 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1657 * handle writting the missing part so report that as an error and
1658 * don't lie to the caller.
1659 */
1660 ret = -1;
6197aea7
DG
1661 goto end;
1662 }
1d4dfdef
DG
1663 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1664 stream->relayd_stream_id, padding);
6197aea7
DG
1665
1666end:
6cd525e8 1667 return (int) ret;
6197aea7
DG
1668}
1669
3bd1e081 1670/*
09e26845
DG
1671 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1672 * core function for writing trace buffers to either the local filesystem or
1673 * the network.
1674 *
d2956687 1675 * It must be called with the stream and the channel lock held.
79d4ffb7 1676 *
09e26845 1677 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1678 *
1679 * Returns the number of bytes written
1680 */
4078b776 1681ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081 1682 struct lttng_consumer_local_data *ctx,
1d4dfdef 1683 struct lttng_consumer_stream *stream, unsigned long len,
309167d2 1684 unsigned long padding,
50adc264 1685 struct ctf_packet_index *index)
3bd1e081 1686{
f02e1e8a 1687 unsigned long mmap_offset;
ffe60014 1688 void *mmap_base;
994ab360 1689 ssize_t ret = 0;
f02e1e8a
DG
1690 off_t orig_offset = stream->out_fd_offset;
1691 /* Default is on the disk */
1692 int outfd = stream->out_fd;
f02e1e8a 1693 struct consumer_relayd_sock_pair *relayd = NULL;
8994307f 1694 unsigned int relayd_hang_up = 0;
f02e1e8a
DG
1695
1696 /* RCU lock for the relayd pointer */
1697 rcu_read_lock();
7fd975c5 1698 assert(stream->net_seq_idx != (uint64_t) -1ULL ||
948411cd 1699 stream->trace_chunk);
d2956687 1700
f02e1e8a 1701 /* Flag that the current stream if set for network streaming. */
da009f2c 1702 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1703 relayd = consumer_find_relayd(stream->net_seq_idx);
1704 if (relayd == NULL) {
56591bac 1705 ret = -EPIPE;
f02e1e8a
DG
1706 goto end;
1707 }
1708 }
1709
1710 /* get the offset inside the fd to mmap */
3bd1e081
MD
1711 switch (consumer_data.type) {
1712 case LTTNG_CONSUMER_KERNEL:
ffe60014 1713 mmap_base = stream->mmap_base;
f02e1e8a 1714 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
994ab360 1715 if (ret < 0) {
56591bac 1716 PERROR("tracer ctl get_mmap_read_offset");
56591bac
MD
1717 goto end;
1718 }
f02e1e8a 1719 break;
7753dea8
MD
1720 case LTTNG_CONSUMER32_UST:
1721 case LTTNG_CONSUMER64_UST:
ffe60014
DG
1722 mmap_base = lttng_ustctl_get_mmap_base(stream);
1723 if (!mmap_base) {
1724 ERR("read mmap get mmap base for stream %s", stream->name);
994ab360 1725 ret = -EPERM;
ffe60014
DG
1726 goto end;
1727 }
1728 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
56591bac
MD
1729 if (ret != 0) {
1730 PERROR("tracer ctl get_mmap_read_offset");
994ab360 1731 ret = -EINVAL;
56591bac
MD
1732 goto end;
1733 }
f02e1e8a 1734 break;
3bd1e081
MD
1735 default:
1736 ERR("Unknown consumer_data type");
1737 assert(0);
1738 }
b9182dd9 1739
f02e1e8a
DG
1740 /* Handle stream on the relayd if the output is on the network */
1741 if (relayd) {
1742 unsigned long netlen = len;
1743
1744 /*
1745 * Lock the control socket for the complete duration of the function
1746 * since from this point on we will use the socket.
1747 */
1748 if (stream->metadata_flag) {
1749 /* Metadata requires the control socket. */
1750 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93ec662e
JD
1751 if (stream->reset_metadata_flag) {
1752 ret = relayd_reset_metadata(&relayd->control_sock,
1753 stream->relayd_stream_id,
1754 stream->metadata_version);
1755 if (ret < 0) {
1756 relayd_hang_up = 1;
1757 goto write_error;
1758 }
1759 stream->reset_metadata_flag = 0;
1760 }
1d4dfdef 1761 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1762 }
1763
1d4dfdef 1764 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
994ab360
DG
1765 if (ret < 0) {
1766 relayd_hang_up = 1;
1767 goto write_error;
1768 }
1769 /* Use the returned socket. */
1770 outfd = ret;
f02e1e8a 1771
994ab360
DG
1772 /* Write metadata stream id before payload */
1773 if (stream->metadata_flag) {
239f61af 1774 ret = write_relayd_metadata_id(outfd, stream, padding);
994ab360 1775 if (ret < 0) {
8994307f
DG
1776 relayd_hang_up = 1;
1777 goto write_error;
1778 }
f02e1e8a 1779 }
1d4dfdef
DG
1780 } else {
1781 /* No streaming, we have to set the len with the full padding */
1782 len += padding;
1624d5b7 1783
93ec662e
JD
1784 if (stream->metadata_flag && stream->reset_metadata_flag) {
1785 ret = utils_truncate_stream_file(stream->out_fd, 0);
1786 if (ret < 0) {
1787 ERR("Reset metadata file");
1788 goto end;
1789 }
1790 stream->reset_metadata_flag = 0;
1791 }
1792
1624d5b7
JD
1793 /*
1794 * Check if we need to change the tracefile before writing the packet.
1795 */
1796 if (stream->chan->tracefile_size > 0 &&
1797 (stream->tracefile_size_current + len) >
1798 stream->chan->tracefile_size) {
d2956687
JG
1799 ret = consumer_stream_rotate_output_files(stream);
1800 if (ret) {
1624d5b7
JD
1801 goto end;
1802 }
309167d2 1803 outfd = stream->out_fd;
a1ae300f 1804 orig_offset = 0;
1624d5b7
JD
1805 }
1806 stream->tracefile_size_current += len;
309167d2
JD
1807 if (index) {
1808 index->offset = htobe64(stream->out_fd_offset);
1809 }
f02e1e8a
DG
1810 }
1811
d02b8372
DG
1812 /*
1813 * This call guarantee that len or less is returned. It's impossible to
1814 * receive a ret value that is bigger than len.
1815 */
1816 ret = lttng_write(outfd, mmap_base + mmap_offset, len);
1817 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1818 if (ret < 0 || ((size_t) ret != len)) {
1819 /*
1820 * Report error to caller if nothing was written else at least send the
1821 * amount written.
1822 */
1823 if (ret < 0) {
994ab360 1824 ret = -errno;
f02e1e8a 1825 }
994ab360 1826 relayd_hang_up = 1;
f02e1e8a 1827
d02b8372 1828 /* Socket operation failed. We consider the relayd dead */
fcf0f774 1829 if (errno == EPIPE) {
d02b8372
DG
1830 /*
1831 * This is possible if the fd is closed on the other side
1832 * (outfd) or any write problem. It can be verbose a bit for a
1833 * normal execution if for instance the relayd is stopped
1834 * abruptly. This can happen so set this to a DBG statement.
1835 */
1836 DBG("Consumer mmap write detected relayd hang up");
994ab360
DG
1837 } else {
1838 /* Unhandled error, print it and stop function right now. */
1839 PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
f02e1e8a 1840 }
994ab360 1841 goto write_error;
d02b8372
DG
1842 }
1843 stream->output_written += ret;
d02b8372
DG
1844
1845 /* This call is useless on a socket so better save a syscall. */
1846 if (!relayd) {
1847 /* This won't block, but will start writeout asynchronously */
1848 lttng_sync_file_range(outfd, stream->out_fd_offset, len,
1849 SYNC_FILE_RANGE_WRITE);
1850 stream->out_fd_offset += len;
f5dbe415 1851 lttng_consumer_sync_trace_file(stream, orig_offset);
f02e1e8a 1852 }
f02e1e8a 1853
8994307f
DG
1854write_error:
1855 /*
1856 * This is a special case that the relayd has closed its socket. Let's
1857 * cleanup the relayd object and all associated streams.
1858 */
1859 if (relayd && relayd_hang_up) {
9276e5c8
JR
1860 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
1861 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
1862 }
1863
f02e1e8a
DG
1864end:
1865 /* Unlock only if ctrl socket used */
1866 if (relayd && stream->metadata_flag) {
1867 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1868 }
1869
1870 rcu_read_unlock();
994ab360 1871 return ret;
3bd1e081
MD
1872}
1873
1874/*
1875 * Splice the data from the ring buffer to the tracefile.
1876 *
79d4ffb7
DG
1877 * It must be called with the stream lock held.
1878 *
3bd1e081
MD
1879 * Returns the number of bytes spliced.
1880 */
4078b776 1881ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081 1882 struct lttng_consumer_local_data *ctx,
1d4dfdef 1883 struct lttng_consumer_stream *stream, unsigned long len,
309167d2 1884 unsigned long padding,
50adc264 1885 struct ctf_packet_index *index)
3bd1e081 1886{
f02e1e8a
DG
1887 ssize_t ret = 0, written = 0, ret_splice = 0;
1888 loff_t offset = 0;
1889 off_t orig_offset = stream->out_fd_offset;
1890 int fd = stream->wait_fd;
1891 /* Default is on the disk */
1892 int outfd = stream->out_fd;
f02e1e8a 1893 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1894 int *splice_pipe;
8994307f 1895 unsigned int relayd_hang_up = 0;
f02e1e8a 1896
3bd1e081
MD
1897 switch (consumer_data.type) {
1898 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1899 break;
7753dea8
MD
1900 case LTTNG_CONSUMER32_UST:
1901 case LTTNG_CONSUMER64_UST:
f02e1e8a 1902 /* Not supported for user space tracing */
3bd1e081
MD
1903 return -ENOSYS;
1904 default:
1905 ERR("Unknown consumer_data type");
1906 assert(0);
3bd1e081
MD
1907 }
1908
f02e1e8a
DG
1909 /* RCU lock for the relayd pointer */
1910 rcu_read_lock();
1911
1912 /* Flag that the current stream if set for network streaming. */
da009f2c 1913 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1914 relayd = consumer_find_relayd(stream->net_seq_idx);
1915 if (relayd == NULL) {
ad0b0d23 1916 written = -ret;
f02e1e8a
DG
1917 goto end;
1918 }
1919 }
a2361a61 1920 splice_pipe = stream->splice_pipe;
fb3a43a9 1921
f02e1e8a 1922 /* Write metadata stream id before payload */
1d4dfdef 1923 if (relayd) {
ad0b0d23 1924 unsigned long total_len = len;
f02e1e8a 1925
1d4dfdef
DG
1926 if (stream->metadata_flag) {
1927 /*
1928 * Lock the control socket for the complete duration of the function
1929 * since from this point on we will use the socket.
1930 */
1931 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1932
93ec662e
JD
1933 if (stream->reset_metadata_flag) {
1934 ret = relayd_reset_metadata(&relayd->control_sock,
1935 stream->relayd_stream_id,
1936 stream->metadata_version);
1937 if (ret < 0) {
1938 relayd_hang_up = 1;
1939 goto write_error;
1940 }
1941 stream->reset_metadata_flag = 0;
1942 }
239f61af 1943 ret = write_relayd_metadata_id(splice_pipe[1], stream,
1d4dfdef
DG
1944 padding);
1945 if (ret < 0) {
1946 written = ret;
ad0b0d23
DG
1947 relayd_hang_up = 1;
1948 goto write_error;
1d4dfdef
DG
1949 }
1950
1951 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1952 }
1953
1954 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
ad0b0d23
DG
1955 if (ret < 0) {
1956 written = ret;
1957 relayd_hang_up = 1;
1958 goto write_error;
f02e1e8a 1959 }
ad0b0d23
DG
1960 /* Use the returned socket. */
1961 outfd = ret;
1d4dfdef
DG
1962 } else {
1963 /* No streaming, we have to set the len with the full padding */
1964 len += padding;
1624d5b7 1965
93ec662e
JD
1966 if (stream->metadata_flag && stream->reset_metadata_flag) {
1967 ret = utils_truncate_stream_file(stream->out_fd, 0);
1968 if (ret < 0) {
1969 ERR("Reset metadata file");
1970 goto end;
1971 }
1972 stream->reset_metadata_flag = 0;
1973 }
1624d5b7
JD
1974 /*
1975 * Check if we need to change the tracefile before writing the packet.
1976 */
1977 if (stream->chan->tracefile_size > 0 &&
1978 (stream->tracefile_size_current + len) >
1979 stream->chan->tracefile_size) {
d2956687 1980 ret = consumer_stream_rotate_output_files(stream);
1624d5b7 1981 if (ret < 0) {
ad0b0d23 1982 written = ret;
1624d5b7
JD
1983 goto end;
1984 }
309167d2 1985 outfd = stream->out_fd;
a1ae300f 1986 orig_offset = 0;
1624d5b7
JD
1987 }
1988 stream->tracefile_size_current += len;
309167d2 1989 index->offset = htobe64(stream->out_fd_offset);
f02e1e8a
DG
1990 }
1991
1992 while (len > 0) {
1d4dfdef
DG
1993 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1994 (unsigned long)offset, len, fd, splice_pipe[1]);
fb3a43a9 1995 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1996 SPLICE_F_MOVE | SPLICE_F_MORE);
1997 DBG("splice chan to pipe, ret %zd", ret_splice);
1998 if (ret_splice < 0) {
d02b8372 1999 ret = errno;
ad0b0d23 2000 written = -ret;
d02b8372 2001 PERROR("Error in relay splice");
f02e1e8a
DG
2002 goto splice_error;
2003 }
2004
2005 /* Handle stream on the relayd if the output is on the network */
ad0b0d23
DG
2006 if (relayd && stream->metadata_flag) {
2007 size_t metadata_payload_size =
2008 sizeof(struct lttcomm_relayd_metadata_payload);
2009
2010 /* Update counter to fit the spliced data */
2011 ret_splice += metadata_payload_size;
2012 len += metadata_payload_size;
2013 /*
2014 * We do this so the return value can match the len passed as
2015 * argument to this function.
2016 */
2017 written -= metadata_payload_size;
f02e1e8a
DG
2018 }
2019
2020 /* Splice data out */
fb3a43a9 2021 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a 2022 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
a2361a61
JD
2023 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
2024 outfd, ret_splice);
f02e1e8a 2025 if (ret_splice < 0) {
d02b8372 2026 ret = errno;
ad0b0d23
DG
2027 written = -ret;
2028 relayd_hang_up = 1;
2029 goto write_error;
f02e1e8a 2030 } else if (ret_splice > len) {
d02b8372
DG
2031 /*
2032 * We don't expect this code path to be executed but you never know
2033 * so this is an extra protection agains a buggy splice().
2034 */
f02e1e8a 2035 ret = errno;
ad0b0d23 2036 written += ret_splice;
d02b8372
DG
2037 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
2038 len);
f02e1e8a 2039 goto splice_error;
d02b8372
DG
2040 } else {
2041 /* All good, update current len and continue. */
2042 len -= ret_splice;
f02e1e8a 2043 }
f02e1e8a
DG
2044
2045 /* This call is useless on a socket so better save a syscall. */
2046 if (!relayd) {
2047 /* This won't block, but will start writeout asynchronously */
2048 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
2049 SYNC_FILE_RANGE_WRITE);
2050 stream->out_fd_offset += ret_splice;
2051 }
e5d1a9b3 2052 stream->output_written += ret_splice;
f02e1e8a
DG
2053 written += ret_splice;
2054 }
f5dbe415
JG
2055 if (!relayd) {
2056 lttng_consumer_sync_trace_file(stream, orig_offset);
2057 }
f02e1e8a
DG
2058 goto end;
2059
8994307f
DG
2060write_error:
2061 /*
2062 * This is a special case that the relayd has closed its socket. Let's
2063 * cleanup the relayd object and all associated streams.
2064 */
2065 if (relayd && relayd_hang_up) {
9276e5c8
JR
2066 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
2067 lttng_consumer_cleanup_relayd(relayd);
8994307f
DG
2068 /* Skip splice error so the consumer does not fail */
2069 goto end;
2070 }
2071
f02e1e8a
DG
2072splice_error:
2073 /* send the appropriate error description to sessiond */
2074 switch (ret) {
f02e1e8a 2075 case EINVAL:
f73fabfd 2076 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
2077 break;
2078 case ENOMEM:
f73fabfd 2079 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
2080 break;
2081 case ESPIPE:
f73fabfd 2082 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
2083 break;
2084 }
2085
2086end:
2087 if (relayd && stream->metadata_flag) {
2088 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2089 }
2090
2091 rcu_read_unlock();
2092 return written;
3bd1e081
MD
2093}
2094
15055ce5
JD
2095/*
2096 * Sample the snapshot positions for a specific fd
2097 *
2098 * Returns 0 on success, < 0 on error
2099 */
2100int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
2101{
2102 switch (consumer_data.type) {
2103 case LTTNG_CONSUMER_KERNEL:
2104 return lttng_kconsumer_sample_snapshot_positions(stream);
2105 case LTTNG_CONSUMER32_UST:
2106 case LTTNG_CONSUMER64_UST:
2107 return lttng_ustconsumer_sample_snapshot_positions(stream);
2108 default:
2109 ERR("Unknown consumer_data type");
2110 assert(0);
2111 return -ENOSYS;
2112 }
2113}
3bd1e081
MD
2114/*
2115 * Take a snapshot for a specific fd
2116 *
2117 * Returns 0 on success, < 0 on error
2118 */
ffe60014 2119int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
2120{
2121 switch (consumer_data.type) {
2122 case LTTNG_CONSUMER_KERNEL:
ffe60014 2123 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
2124 case LTTNG_CONSUMER32_UST:
2125 case LTTNG_CONSUMER64_UST:
ffe60014 2126 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
2127 default:
2128 ERR("Unknown consumer_data type");
2129 assert(0);
2130 return -ENOSYS;
2131 }
3bd1e081
MD
2132}
2133
2134/*
2135 * Get the produced position
2136 *
2137 * Returns 0 on success, < 0 on error
2138 */
ffe60014 2139int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
2140 unsigned long *pos)
2141{
2142 switch (consumer_data.type) {
2143 case LTTNG_CONSUMER_KERNEL:
ffe60014 2144 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
2145 case LTTNG_CONSUMER32_UST:
2146 case LTTNG_CONSUMER64_UST:
ffe60014 2147 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
2148 default:
2149 ERR("Unknown consumer_data type");
2150 assert(0);
2151 return -ENOSYS;
2152 }
2153}
2154
15055ce5
JD
2155/*
2156 * Get the consumed position (free-running counter position in bytes).
2157 *
2158 * Returns 0 on success, < 0 on error
2159 */
2160int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
2161 unsigned long *pos)
2162{
2163 switch (consumer_data.type) {
2164 case LTTNG_CONSUMER_KERNEL:
2165 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2166 case LTTNG_CONSUMER32_UST:
2167 case LTTNG_CONSUMER64_UST:
2168 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2169 default:
2170 ERR("Unknown consumer_data type");
2171 assert(0);
2172 return -ENOSYS;
2173 }
2174}
2175
3bd1e081
MD
2176int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2177 int sock, struct pollfd *consumer_sockpoll)
2178{
2179 switch (consumer_data.type) {
2180 case LTTNG_CONSUMER_KERNEL:
2181 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
2182 case LTTNG_CONSUMER32_UST:
2183 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
2184 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2185 default:
2186 ERR("Unknown consumer_data type");
2187 assert(0);
2188 return -ENOSYS;
2189 }
2190}
2191
1f8d1c14 2192static
6d574024 2193void lttng_consumer_close_all_metadata(void)
d88aee68
DG
2194{
2195 switch (consumer_data.type) {
2196 case LTTNG_CONSUMER_KERNEL:
2197 /*
2198 * The Kernel consumer has a different metadata scheme so we don't
2199 * close anything because the stream will be closed by the session
2200 * daemon.
2201 */
2202 break;
2203 case LTTNG_CONSUMER32_UST:
2204 case LTTNG_CONSUMER64_UST:
2205 /*
2206 * Close all metadata streams. The metadata hash table is passed and
2207 * this call iterates over it by closing all wakeup fd. This is safe
2208 * because at this point we are sure that the metadata producer is
2209 * either dead or blocked.
2210 */
6d574024 2211 lttng_ustconsumer_close_all_metadata(metadata_ht);
d88aee68
DG
2212 break;
2213 default:
2214 ERR("Unknown consumer_data type");
2215 assert(0);
2216 }
2217}
2218
fb3a43a9
DG
2219/*
2220 * Clean up a metadata stream and free its memory.
2221 */
e316aad5
DG
2222void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
2223 struct lttng_ht *ht)
fb3a43a9 2224{
a6ef8ee6
JG
2225 struct lttng_consumer_channel *channel = NULL;
2226 bool free_channel = false;
fb3a43a9
DG
2227
2228 assert(stream);
2229 /*
2230 * This call should NEVER receive regular stream. It must always be
2231 * metadata stream and this is crucial for data structure synchronization.
2232 */
2233 assert(stream->metadata_flag);
2234
e316aad5
DG
2235 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2236
74251bb8 2237 pthread_mutex_lock(&consumer_data.lock);
a6ef8ee6
JG
2238 /*
2239 * Note that this assumes that a stream's channel is never changed and
2240 * that the stream's lock doesn't need to be taken to sample its
2241 * channel.
2242 */
2243 channel = stream->chan;
2244 pthread_mutex_lock(&channel->lock);
3dad2c0f 2245 pthread_mutex_lock(&stream->lock);
a6ef8ee6 2246 if (channel->metadata_cache) {
081424af 2247 /* Only applicable to userspace consumers. */
a6ef8ee6 2248 pthread_mutex_lock(&channel->metadata_cache->lock);
081424af 2249 }
8994307f 2250
6d574024
DG
2251 /* Remove any reference to that stream. */
2252 consumer_stream_delete(stream, ht);
ca22feea 2253
6d574024
DG
2254 /* Close down everything including the relayd if one. */
2255 consumer_stream_close(stream);
2256 /* Destroy tracer buffers of the stream. */
2257 consumer_stream_destroy_buffers(stream);
fb3a43a9
DG
2258
2259 /* Atomically decrement channel refcount since other threads can use it. */
a6ef8ee6
JG
2260 if (!uatomic_sub_return(&channel->refcount, 1)
2261 && !uatomic_read(&channel->nb_init_stream_left)) {
c30aaa51 2262 /* Go for channel deletion! */
a6ef8ee6 2263 free_channel = true;
fb3a43a9 2264 }
a6ef8ee6 2265 stream->chan = NULL;
fb3a43a9 2266
73811ecc
DG
2267 /*
2268 * Nullify the stream reference so it is not used after deletion. The
6d574024
DG
2269 * channel lock MUST be acquired before being able to check for a NULL
2270 * pointer value.
73811ecc 2271 */
a6ef8ee6 2272 channel->metadata_stream = NULL;
73811ecc 2273
a6ef8ee6
JG
2274 if (channel->metadata_cache) {
2275 pthread_mutex_unlock(&channel->metadata_cache->lock);
081424af 2276 }
3dad2c0f 2277 pthread_mutex_unlock(&stream->lock);
a6ef8ee6 2278 pthread_mutex_unlock(&channel->lock);
74251bb8 2279 pthread_mutex_unlock(&consumer_data.lock);
e316aad5 2280
a6ef8ee6
JG
2281 if (free_channel) {
2282 consumer_del_channel(channel);
e316aad5
DG
2283 }
2284
d2956687
JG
2285 lttng_trace_chunk_put(stream->trace_chunk);
2286 stream->trace_chunk = NULL;
6d574024 2287 consumer_stream_free(stream);
fb3a43a9
DG
2288}
2289
2290/*
2291 * Action done with the metadata stream when adding it to the consumer internal
2292 * data structures to handle it.
2293 */
66d583dc 2294void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2295{
5ab66908 2296 struct lttng_ht *ht = metadata_ht;
76082088 2297 struct lttng_ht_iter iter;
d88aee68 2298 struct lttng_ht_node_u64 *node;
fb3a43a9 2299
e316aad5
DG
2300 assert(stream);
2301 assert(ht);
2302
d88aee68 2303 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5
DG
2304
2305 pthread_mutex_lock(&consumer_data.lock);
a9838785 2306 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2307 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2308 pthread_mutex_lock(&stream->lock);
e316aad5 2309
e316aad5
DG
2310 /*
2311 * From here, refcounts are updated so be _careful_ when returning an error
2312 * after this point.
2313 */
2314
fb3a43a9 2315 rcu_read_lock();
76082088
DG
2316
2317 /*
2318 * Lookup the stream just to make sure it does not exist in our internal
2319 * state. This should NEVER happen.
2320 */
d88aee68
DG
2321 lttng_ht_lookup(ht, &stream->key, &iter);
2322 node = lttng_ht_iter_get_node_u64(&iter);
76082088
DG
2323 assert(!node);
2324
e316aad5 2325 /*
ffe60014
DG
2326 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2327 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2328 * causes the count to become 0 also causes a stream to be added. The
2329 * channel deletion will thus be triggered by the following removal of this
2330 * stream.
2331 */
ffe60014 2332 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2333 /* Increment refcount before decrementing nb_init_stream_left */
2334 cmm_smp_wmb();
ffe60014 2335 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2336 }
2337
d88aee68 2338 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2339
446156b4 2340 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
d8ef542d
MD
2341 &stream->node_channel_id);
2342
ca22feea
DG
2343 /*
2344 * Add stream to the stream_list_ht of the consumer data. No need to steal
2345 * the key since the HT does not use it and we allow to add redundant keys
2346 * into this table.
2347 */
d88aee68 2348 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2349
fb3a43a9 2350 rcu_read_unlock();
e316aad5 2351
2e818a6a 2352 pthread_mutex_unlock(&stream->lock);
a9838785 2353 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2354 pthread_mutex_unlock(&stream->chan->timer_lock);
e316aad5 2355 pthread_mutex_unlock(&consumer_data.lock);
fb3a43a9
DG
2356}
2357
8994307f
DG
2358/*
2359 * Delete data stream that are flagged for deletion (endpoint_status).
2360 */
2361static void validate_endpoint_status_data_stream(void)
2362{
2363 struct lttng_ht_iter iter;
2364 struct lttng_consumer_stream *stream;
2365
2366 DBG("Consumer delete flagged data stream");
2367
2368 rcu_read_lock();
2369 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2370 /* Validate delete flag of the stream */
79d4ffb7 2371 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2372 continue;
2373 }
2374 /* Delete it right now */
2375 consumer_del_stream(stream, data_ht);
2376 }
2377 rcu_read_unlock();
2378}
2379
2380/*
2381 * Delete metadata stream that are flagged for deletion (endpoint_status).
2382 */
2383static void validate_endpoint_status_metadata_stream(
2384 struct lttng_poll_event *pollset)
2385{
2386 struct lttng_ht_iter iter;
2387 struct lttng_consumer_stream *stream;
2388
2389 DBG("Consumer delete flagged metadata stream");
2390
2391 assert(pollset);
2392
2393 rcu_read_lock();
2394 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2395 /* Validate delete flag of the stream */
79d4ffb7 2396 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2397 continue;
2398 }
2399 /*
2400 * Remove from pollset so the metadata thread can continue without
2401 * blocking on a deleted stream.
2402 */
2403 lttng_poll_del(pollset, stream->wait_fd);
2404
2405 /* Delete it right now */
2406 consumer_del_metadata_stream(stream, metadata_ht);
2407 }
2408 rcu_read_unlock();
2409}
2410
fb3a43a9
DG
2411/*
2412 * Thread polls on metadata file descriptor and write them on disk or on the
2413 * network.
2414 */
7d980def 2415void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2416{
1fc79fb4 2417 int ret, i, pollfd, err = -1;
fb3a43a9 2418 uint32_t revents, nb_fd;
e316aad5 2419 struct lttng_consumer_stream *stream = NULL;
fb3a43a9 2420 struct lttng_ht_iter iter;
d88aee68 2421 struct lttng_ht_node_u64 *node;
fb3a43a9
DG
2422 struct lttng_poll_event events;
2423 struct lttng_consumer_local_data *ctx = data;
2424 ssize_t len;
2425
2426 rcu_register_thread();
2427
1fc79fb4
MD
2428 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2429
2d57de81
MD
2430 if (testpoint(consumerd_thread_metadata)) {
2431 goto error_testpoint;
2432 }
2433
9ce5646a
MD
2434 health_code_update();
2435
fb3a43a9
DG
2436 DBG("Thread metadata poll started");
2437
fb3a43a9
DG
2438 /* Size is set to 1 for the consumer_metadata pipe */
2439 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2440 if (ret < 0) {
2441 ERR("Poll set creation failed");
d8ef542d 2442 goto end_poll;
fb3a43a9
DG
2443 }
2444
13886d2d
DG
2445 ret = lttng_poll_add(&events,
2446 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2447 if (ret < 0) {
2448 goto end;
2449 }
2450
2451 /* Main loop */
2452 DBG("Metadata main loop started");
2453
2454 while (1) {
fb3a43a9 2455restart:
7fa2082e 2456 health_code_update();
9ce5646a 2457 health_poll_entry();
7fa2082e 2458 DBG("Metadata poll wait");
fb3a43a9 2459 ret = lttng_poll_wait(&events, -1);
7fa2082e
MD
2460 DBG("Metadata poll return from wait with %d fd(s)",
2461 LTTNG_POLL_GETNB(&events));
9ce5646a 2462 health_poll_exit();
40063ead 2463 DBG("Metadata event caught in thread");
fb3a43a9
DG
2464 if (ret < 0) {
2465 if (errno == EINTR) {
40063ead 2466 ERR("Poll EINTR caught");
fb3a43a9
DG
2467 goto restart;
2468 }
d9607cd7
MD
2469 if (LTTNG_POLL_GETNB(&events) == 0) {
2470 err = 0; /* All is OK */
2471 }
2472 goto end;
fb3a43a9
DG
2473 }
2474
0d9c5d77
DG
2475 nb_fd = ret;
2476
e316aad5 2477 /* From here, the event is a metadata wait fd */
fb3a43a9 2478 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2479 health_code_update();
2480
fb3a43a9
DG
2481 revents = LTTNG_POLL_GETEV(&events, i);
2482 pollfd = LTTNG_POLL_GETFD(&events, i);
2483
13886d2d 2484 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
03e43155 2485 if (revents & LPOLLIN) {
13886d2d
DG
2486 ssize_t pipe_len;
2487
2488 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2489 &stream, sizeof(stream));
6cd525e8 2490 if (pipe_len < sizeof(stream)) {
03e43155
MD
2491 if (pipe_len < 0) {
2492 PERROR("read metadata stream");
2493 }
fb3a43a9 2494 /*
03e43155
MD
2495 * Remove the pipe from the poll set and continue the loop
2496 * since their might be data to consume.
fb3a43a9 2497 */
03e43155
MD
2498 lttng_poll_del(&events,
2499 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2500 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2501 continue;
2502 }
2503
8994307f
DG
2504 /* A NULL stream means that the state has changed. */
2505 if (stream == NULL) {
2506 /* Check for deleted streams. */
2507 validate_endpoint_status_metadata_stream(&events);
3714380f 2508 goto restart;
8994307f
DG
2509 }
2510
fb3a43a9
DG
2511 DBG("Adding metadata stream %d to poll set",
2512 stream->wait_fd);
2513
fb3a43a9
DG
2514 /* Add metadata stream to the global poll events list */
2515 lttng_poll_add(&events, stream->wait_fd,
6d574024 2516 LPOLLIN | LPOLLPRI | LPOLLHUP);
03e43155
MD
2517 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2518 DBG("Metadata thread pipe hung up");
2519 /*
2520 * Remove the pipe from the poll set and continue the loop
2521 * since their might be data to consume.
2522 */
2523 lttng_poll_del(&events,
2524 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2525 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2526 continue;
2527 } else {
2528 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2529 goto end;
fb3a43a9
DG
2530 }
2531
e316aad5 2532 /* Handle other stream */
fb3a43a9
DG
2533 continue;
2534 }
2535
d09e1200 2536 rcu_read_lock();
d88aee68
DG
2537 {
2538 uint64_t tmp_id = (uint64_t) pollfd;
2539
2540 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2541 }
2542 node = lttng_ht_iter_get_node_u64(&iter);
e316aad5 2543 assert(node);
fb3a43a9
DG
2544
2545 stream = caa_container_of(node, struct lttng_consumer_stream,
58b1f425 2546 node);
fb3a43a9 2547
03e43155
MD
2548 if (revents & (LPOLLIN | LPOLLPRI)) {
2549 /* Get the data out of the metadata file descriptor */
2550 DBG("Metadata available on fd %d", pollfd);
2551 assert(stream->wait_fd == pollfd);
2552
2553 do {
2554 health_code_update();
2555
2556 len = ctx->on_buffer_ready(stream, ctx);
2557 /*
2558 * We don't check the return value here since if we get
83f4233d 2559 * a negative len, it means an error occurred thus we
03e43155
MD
2560 * simply remove it from the poll set and free the
2561 * stream.
2562 */
2563 } while (len > 0);
2564
2565 /* It's ok to have an unavailable sub-buffer */
2566 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2567 /* Clean up stream from consumer and free it. */
2568 lttng_poll_del(&events, stream->wait_fd);
2569 consumer_del_metadata_stream(stream, metadata_ht);
2570 }
2571 } else if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2572 DBG("Metadata fd %d is hup|err.", pollfd);
fb3a43a9
DG
2573 if (!stream->hangup_flush_done
2574 && (consumer_data.type == LTTNG_CONSUMER32_UST
2575 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2576 DBG("Attempting to flush and consume the UST buffers");
2577 lttng_ustconsumer_on_stream_hangup(stream);
2578
2579 /* We just flushed the stream now read it. */
4bb94b75 2580 do {
9ce5646a
MD
2581 health_code_update();
2582
4bb94b75
DG
2583 len = ctx->on_buffer_ready(stream, ctx);
2584 /*
2585 * We don't check the return value here since if we get
83f4233d 2586 * a negative len, it means an error occurred thus we
4bb94b75
DG
2587 * simply remove it from the poll set and free the
2588 * stream.
2589 */
2590 } while (len > 0);
fb3a43a9
DG
2591 }
2592
fb3a43a9 2593 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2594 /*
2595 * This call update the channel states, closes file descriptors
2596 * and securely free the stream.
2597 */
2598 consumer_del_metadata_stream(stream, metadata_ht);
03e43155
MD
2599 } else {
2600 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
6f2f1a70 2601 rcu_read_unlock();
03e43155 2602 goto end;
fb3a43a9 2603 }
e316aad5 2604 /* Release RCU lock for the stream looked up */
d09e1200 2605 rcu_read_unlock();
fb3a43a9
DG
2606 }
2607 }
2608
1fc79fb4
MD
2609 /* All is OK */
2610 err = 0;
fb3a43a9
DG
2611end:
2612 DBG("Metadata poll thread exiting");
fb3a43a9 2613
d8ef542d
MD
2614 lttng_poll_clean(&events);
2615end_poll:
2d57de81 2616error_testpoint:
1fc79fb4
MD
2617 if (err) {
2618 health_error();
2619 ERR("Health error occurred in %s", __func__);
2620 }
2621 health_unregister(health_consumerd);
fb3a43a9
DG
2622 rcu_unregister_thread();
2623 return NULL;
2624}
2625
3bd1e081 2626/*
e4421fec 2627 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2628 * it to tracefile if necessary.
2629 */
7d980def 2630void *consumer_thread_data_poll(void *data)
3bd1e081 2631{
1fc79fb4 2632 int num_rdy, num_hup, high_prio, ret, i, err = -1;
3bd1e081
MD
2633 struct pollfd *pollfd = NULL;
2634 /* local view of the streams */
c869f647 2635 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
3bd1e081 2636 /* local view of consumer_data.fds_count */
8bdcc002
JG
2637 int nb_fd = 0;
2638 /* 2 for the consumer_data_pipe and wake up pipe */
2639 const int nb_pipes_fd = 2;
9a2fcf78
JD
2640 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2641 int nb_inactive_fd = 0;
3bd1e081 2642 struct lttng_consumer_local_data *ctx = data;
00e2e675 2643 ssize_t len;
3bd1e081 2644
e7b994a3
DG
2645 rcu_register_thread();
2646
1fc79fb4
MD
2647 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2648
2d57de81
MD
2649 if (testpoint(consumerd_thread_data)) {
2650 goto error_testpoint;
2651 }
2652
9ce5646a
MD
2653 health_code_update();
2654
4df6c8cb
MD
2655 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2656 if (local_stream == NULL) {
2657 PERROR("local_stream malloc");
2658 goto end;
2659 }
3bd1e081
MD
2660
2661 while (1) {
9ce5646a
MD
2662 health_code_update();
2663
3bd1e081
MD
2664 high_prio = 0;
2665 num_hup = 0;
2666
2667 /*
e4421fec 2668 * the fds set has been updated, we need to update our
3bd1e081
MD
2669 * local array as well
2670 */
2671 pthread_mutex_lock(&consumer_data.lock);
2672 if (consumer_data.need_update) {
0e428499
DG
2673 free(pollfd);
2674 pollfd = NULL;
2675
2676 free(local_stream);
2677 local_stream = NULL;
3bd1e081 2678
8bdcc002 2679 /* Allocate for all fds */
261de637 2680 pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
3bd1e081 2681 if (pollfd == NULL) {
7a57cf92 2682 PERROR("pollfd malloc");
3bd1e081
MD
2683 pthread_mutex_unlock(&consumer_data.lock);
2684 goto end;
2685 }
2686
261de637 2687 local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
747f8642 2688 sizeof(struct lttng_consumer_stream *));
3bd1e081 2689 if (local_stream == NULL) {
7a57cf92 2690 PERROR("local_stream malloc");
3bd1e081
MD
2691 pthread_mutex_unlock(&consumer_data.lock);
2692 goto end;
2693 }
ffe60014 2694 ret = update_poll_array(ctx, &pollfd, local_stream,
9a2fcf78 2695 data_ht, &nb_inactive_fd);
3bd1e081
MD
2696 if (ret < 0) {
2697 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2698 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2699 pthread_mutex_unlock(&consumer_data.lock);
2700 goto end;
2701 }
2702 nb_fd = ret;
2703 consumer_data.need_update = 0;
2704 }
2705 pthread_mutex_unlock(&consumer_data.lock);
2706
4078b776 2707 /* No FDs and consumer_quit, consumer_cleanup the thread */
9a2fcf78
JD
2708 if (nb_fd == 0 && nb_inactive_fd == 0 &&
2709 CMM_LOAD_SHARED(consumer_quit) == 1) {
1fc79fb4 2710 err = 0; /* All is OK */
4078b776
MD
2711 goto end;
2712 }
3bd1e081 2713 /* poll on the array of fds */
88f2b785 2714 restart:
261de637 2715 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
cf0bcb51
JG
2716 if (testpoint(consumerd_thread_data_poll)) {
2717 goto end;
2718 }
9ce5646a 2719 health_poll_entry();
261de637 2720 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
9ce5646a 2721 health_poll_exit();
3bd1e081
MD
2722 DBG("poll num_rdy : %d", num_rdy);
2723 if (num_rdy == -1) {
88f2b785
MD
2724 /*
2725 * Restart interrupted system call.
2726 */
2727 if (errno == EINTR) {
2728 goto restart;
2729 }
7a57cf92 2730 PERROR("Poll error");
f73fabfd 2731 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2732 goto end;
2733 } else if (num_rdy == 0) {
2734 DBG("Polling thread timed out");
2735 goto end;
2736 }
2737
80957876
JG
2738 if (caa_unlikely(data_consumption_paused)) {
2739 DBG("Data consumption paused, sleeping...");
2740 sleep(1);
2741 goto restart;
2742 }
2743
3bd1e081 2744 /*
50f8ae69 2745 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2746 * beginning of the loop to update the array. We want to prioritize
2747 * array update over low-priority reads.
3bd1e081 2748 */
509bb1cf 2749 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2750 ssize_t pipe_readlen;
04fdd819 2751
50f8ae69 2752 DBG("consumer_data_pipe wake up");
acdb9057
DG
2753 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2754 &new_stream, sizeof(new_stream));
6cd525e8
MD
2755 if (pipe_readlen < sizeof(new_stream)) {
2756 PERROR("Consumer data pipe");
23f5f35d
DG
2757 /* Continue so we can at least handle the current stream(s). */
2758 continue;
2759 }
c869f647
DG
2760
2761 /*
2762 * If the stream is NULL, just ignore it. It's also possible that
2763 * the sessiond poll thread changed the consumer_quit state and is
2764 * waking us up to test it.
2765 */
2766 if (new_stream == NULL) {
8994307f 2767 validate_endpoint_status_data_stream();
c869f647
DG
2768 continue;
2769 }
2770
c869f647 2771 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2772 continue;
2773 }
2774
02b3d176
DG
2775 /* Handle wakeup pipe. */
2776 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2777 char dummy;
2778 ssize_t pipe_readlen;
2779
2780 pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
2781 sizeof(dummy));
2782 if (pipe_readlen < 0) {
2783 PERROR("Consumer data wakeup pipe");
2784 }
2785 /* We've been awakened to handle stream(s). */
2786 ctx->has_wakeup = 0;
2787 }
2788
3bd1e081
MD
2789 /* Take care of high priority channels first. */
2790 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2791 health_code_update();
2792
9617607b
DG
2793 if (local_stream[i] == NULL) {
2794 continue;
2795 }
fb3a43a9 2796 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2797 DBG("Urgent read on fd %d", pollfd[i].fd);
2798 high_prio = 1;
4078b776 2799 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 2800 /* it's ok to have an unavailable sub-buffer */
b64403e3 2801 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2802 /* Clean the stream and free it. */
2803 consumer_del_stream(local_stream[i], data_ht);
9617607b 2804 local_stream[i] = NULL;
4078b776
MD
2805 } else if (len > 0) {
2806 local_stream[i]->data_read = 1;
d41f73b7 2807 }
3bd1e081
MD
2808 }
2809 }
2810
4078b776
MD
2811 /*
2812 * If we read high prio channel in this loop, try again
2813 * for more high prio data.
2814 */
2815 if (high_prio) {
3bd1e081
MD
2816 continue;
2817 }
2818
2819 /* Take care of low priority channels. */
4078b776 2820 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2821 health_code_update();
2822
9617607b
DG
2823 if (local_stream[i] == NULL) {
2824 continue;
2825 }
4078b776 2826 if ((pollfd[i].revents & POLLIN) ||
02b3d176
DG
2827 local_stream[i]->hangup_flush_done ||
2828 local_stream[i]->has_data) {
4078b776
MD
2829 DBG("Normal read on fd %d", pollfd[i].fd);
2830 len = ctx->on_buffer_ready(local_stream[i], ctx);
2831 /* it's ok to have an unavailable sub-buffer */
b64403e3 2832 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2833 /* Clean the stream and free it. */
2834 consumer_del_stream(local_stream[i], data_ht);
9617607b 2835 local_stream[i] = NULL;
4078b776
MD
2836 } else if (len > 0) {
2837 local_stream[i]->data_read = 1;
2838 }
2839 }
2840 }
2841
2842 /* Handle hangup and errors */
2843 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2844 health_code_update();
2845
9617607b
DG
2846 if (local_stream[i] == NULL) {
2847 continue;
2848 }
4078b776
MD
2849 if (!local_stream[i]->hangup_flush_done
2850 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2851 && (consumer_data.type == LTTNG_CONSUMER32_UST
2852 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2853 DBG("fd %d is hup|err|nval. Attempting flush and read.",
9617607b 2854 pollfd[i].fd);
4078b776
MD
2855 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2856 /* Attempt read again, for the data we just flushed. */
2857 local_stream[i]->data_read = 1;
2858 }
2859 /*
2860 * If the poll flag is HUP/ERR/NVAL and we have
2861 * read no data in this pass, we can remove the
2862 * stream from its hash table.
2863 */
2864 if ((pollfd[i].revents & POLLHUP)) {
2865 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2866 if (!local_stream[i]->data_read) {
43c34bc3 2867 consumer_del_stream(local_stream[i], data_ht);
9617607b 2868 local_stream[i] = NULL;
4078b776
MD
2869 num_hup++;
2870 }
2871 } else if (pollfd[i].revents & POLLERR) {
2872 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2873 if (!local_stream[i]->data_read) {
43c34bc3 2874 consumer_del_stream(local_stream[i], data_ht);
9617607b 2875 local_stream[i] = NULL;
4078b776
MD
2876 num_hup++;
2877 }
2878 } else if (pollfd[i].revents & POLLNVAL) {
2879 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2880 if (!local_stream[i]->data_read) {
43c34bc3 2881 consumer_del_stream(local_stream[i], data_ht);
9617607b 2882 local_stream[i] = NULL;
4078b776 2883 num_hup++;
3bd1e081
MD
2884 }
2885 }
9617607b
DG
2886 if (local_stream[i] != NULL) {
2887 local_stream[i]->data_read = 0;
2888 }
3bd1e081
MD
2889 }
2890 }
1fc79fb4
MD
2891 /* All is OK */
2892 err = 0;
3bd1e081
MD
2893end:
2894 DBG("polling thread exiting");
0e428499
DG
2895 free(pollfd);
2896 free(local_stream);
fb3a43a9
DG
2897
2898 /*
2899 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2900 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2901 * read side of the pipe. If we close them both, epoll_wait strangely does
2902 * not return and could create a endless wait period if the pipe is the
2903 * only tracked fd in the poll set. The thread will take care of closing
2904 * the read side.
fb3a43a9 2905 */
13886d2d 2906 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2907
2d57de81 2908error_testpoint:
1fc79fb4
MD
2909 if (err) {
2910 health_error();
2911 ERR("Health error occurred in %s", __func__);
2912 }
2913 health_unregister(health_consumerd);
2914
e7b994a3 2915 rcu_unregister_thread();
3bd1e081
MD
2916 return NULL;
2917}
2918
d8ef542d
MD
2919/*
2920 * Close wake-up end of each stream belonging to the channel. This will
2921 * allow the poll() on the stream read-side to detect when the
2922 * write-side (application) finally closes them.
2923 */
2924static
2925void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2926{
2927 struct lttng_ht *ht;
2928 struct lttng_consumer_stream *stream;
2929 struct lttng_ht_iter iter;
2930
2931 ht = consumer_data.stream_per_chan_id_ht;
2932
2933 rcu_read_lock();
2934 cds_lfht_for_each_entry_duplicate(ht->ht,
2935 ht->hash_fct(&channel->key, lttng_ht_seed),
2936 ht->match_fct, &channel->key,
2937 &iter.iter, stream, node_channel_id.node) {
f2ad556d
MD
2938 /*
2939 * Protect against teardown with mutex.
2940 */
2941 pthread_mutex_lock(&stream->lock);
2942 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2943 goto next;
2944 }
d8ef542d
MD
2945 switch (consumer_data.type) {
2946 case LTTNG_CONSUMER_KERNEL:
2947 break;
2948 case LTTNG_CONSUMER32_UST:
2949 case LTTNG_CONSUMER64_UST:
b4a650f3
DG
2950 if (stream->metadata_flag) {
2951 /* Safe and protected by the stream lock. */
2952 lttng_ustconsumer_close_metadata(stream->chan);
2953 } else {
2954 /*
2955 * Note: a mutex is taken internally within
2956 * liblttng-ust-ctl to protect timer wakeup_fd
2957 * use from concurrent close.
2958 */
2959 lttng_ustconsumer_close_stream_wakeup(stream);
2960 }
d8ef542d
MD
2961 break;
2962 default:
2963 ERR("Unknown consumer_data type");
2964 assert(0);
2965 }
f2ad556d
MD
2966 next:
2967 pthread_mutex_unlock(&stream->lock);
d8ef542d
MD
2968 }
2969 rcu_read_unlock();
2970}
2971
2972static void destroy_channel_ht(struct lttng_ht *ht)
2973{
2974 struct lttng_ht_iter iter;
2975 struct lttng_consumer_channel *channel;
2976 int ret;
2977
2978 if (ht == NULL) {
2979 return;
2980 }
2981
2982 rcu_read_lock();
2983 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2984 ret = lttng_ht_del(ht, &iter);
2985 assert(ret != 0);
2986 }
2987 rcu_read_unlock();
2988
2989 lttng_ht_destroy(ht);
2990}
2991
2992/*
2993 * This thread polls the channel fds to detect when they are being
2994 * closed. It closes all related streams if the channel is detected as
2995 * closed. It is currently only used as a shim layer for UST because the
2996 * consumerd needs to keep the per-stream wakeup end of pipes open for
2997 * periodical flush.
2998 */
2999void *consumer_thread_channel_poll(void *data)
3000{
1fc79fb4 3001 int ret, i, pollfd, err = -1;
d8ef542d
MD
3002 uint32_t revents, nb_fd;
3003 struct lttng_consumer_channel *chan = NULL;
3004 struct lttng_ht_iter iter;
3005 struct lttng_ht_node_u64 *node;
3006 struct lttng_poll_event events;
3007 struct lttng_consumer_local_data *ctx = data;
3008 struct lttng_ht *channel_ht;
3009
3010 rcu_register_thread();
3011
1fc79fb4
MD
3012 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
3013
2d57de81
MD
3014 if (testpoint(consumerd_thread_channel)) {
3015 goto error_testpoint;
3016 }
3017
9ce5646a
MD
3018 health_code_update();
3019
d8ef542d
MD
3020 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3021 if (!channel_ht) {
3022 /* ENOMEM at this point. Better to bail out. */
3023 goto end_ht;
3024 }
3025
3026 DBG("Thread channel poll started");
3027
3028 /* Size is set to 1 for the consumer_channel pipe */
3029 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
3030 if (ret < 0) {
3031 ERR("Poll set creation failed");
3032 goto end_poll;
3033 }
3034
3035 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
3036 if (ret < 0) {
3037 goto end;
3038 }
3039
3040 /* Main loop */
3041 DBG("Channel main loop started");
3042
3043 while (1) {
d8ef542d 3044restart:
7fa2082e
MD
3045 health_code_update();
3046 DBG("Channel poll wait");
9ce5646a 3047 health_poll_entry();
d8ef542d 3048 ret = lttng_poll_wait(&events, -1);
7fa2082e
MD
3049 DBG("Channel poll return from wait with %d fd(s)",
3050 LTTNG_POLL_GETNB(&events));
9ce5646a 3051 health_poll_exit();
40063ead 3052 DBG("Channel event caught in thread");
d8ef542d
MD
3053 if (ret < 0) {
3054 if (errno == EINTR) {
40063ead 3055 ERR("Poll EINTR caught");
d8ef542d
MD
3056 goto restart;
3057 }
d9607cd7
MD
3058 if (LTTNG_POLL_GETNB(&events) == 0) {
3059 err = 0; /* All is OK */
3060 }
d8ef542d
MD
3061 goto end;
3062 }
3063
3064 nb_fd = ret;
3065
3066 /* From here, the event is a channel wait fd */
3067 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
3068 health_code_update();
3069
d8ef542d
MD
3070 revents = LTTNG_POLL_GETEV(&events, i);
3071 pollfd = LTTNG_POLL_GETFD(&events, i);
3072
d8ef542d 3073 if (pollfd == ctx->consumer_channel_pipe[0]) {
03e43155 3074 if (revents & LPOLLIN) {
d8ef542d 3075 enum consumer_channel_action action;
a0cbdd2e 3076 uint64_t key;
d8ef542d 3077
a0cbdd2e 3078 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d 3079 if (ret <= 0) {
03e43155
MD
3080 if (ret < 0) {
3081 ERR("Error reading channel pipe");
3082 }
3083 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
d8ef542d
MD
3084 continue;
3085 }
3086
3087 switch (action) {
3088 case CONSUMER_CHANNEL_ADD:
3089 DBG("Adding channel %d to poll set",
3090 chan->wait_fd);
3091
3092 lttng_ht_node_init_u64(&chan->wait_fd_node,
3093 chan->wait_fd);
c7260a81 3094 rcu_read_lock();
d8ef542d
MD
3095 lttng_ht_add_unique_u64(channel_ht,
3096 &chan->wait_fd_node);
c7260a81 3097 rcu_read_unlock();
d8ef542d
MD
3098 /* Add channel to the global poll events list */
3099 lttng_poll_add(&events, chan->wait_fd,
03e43155 3100 LPOLLERR | LPOLLHUP);
d8ef542d 3101 break;
a0cbdd2e
MD
3102 case CONSUMER_CHANNEL_DEL:
3103 {
b4a650f3
DG
3104 /*
3105 * This command should never be called if the channel
3106 * has streams monitored by either the data or metadata
3107 * thread. The consumer only notify this thread with a
3108 * channel del. command if it receives a destroy
3109 * channel command from the session daemon that send it
3110 * if a command prior to the GET_CHANNEL failed.
3111 */
3112
c7260a81 3113 rcu_read_lock();
a0cbdd2e
MD
3114 chan = consumer_find_channel(key);
3115 if (!chan) {
c7260a81 3116 rcu_read_unlock();
a0cbdd2e
MD
3117 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
3118 break;
3119 }
3120 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 3121 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e
MD
3122 ret = lttng_ht_del(channel_ht, &iter);
3123 assert(ret == 0);
a0cbdd2e 3124
f2a444f1
DG
3125 switch (consumer_data.type) {
3126 case LTTNG_CONSUMER_KERNEL:
3127 break;
3128 case LTTNG_CONSUMER32_UST:
3129 case LTTNG_CONSUMER64_UST:
212d67a2
DG
3130 health_code_update();
3131 /* Destroy streams that might have been left in the stream list. */
3132 clean_channel_stream_list(chan);
f2a444f1
DG
3133 break;
3134 default:
3135 ERR("Unknown consumer_data type");
3136 assert(0);
3137 }
3138
a0cbdd2e
MD
3139 /*
3140 * Release our own refcount. Force channel deletion even if
3141 * streams were not initialized.
3142 */
3143 if (!uatomic_sub_return(&chan->refcount, 1)) {
3144 consumer_del_channel(chan);
3145 }
c7260a81 3146 rcu_read_unlock();
a0cbdd2e
MD
3147 goto restart;
3148 }
d8ef542d
MD
3149 case CONSUMER_CHANNEL_QUIT:
3150 /*
3151 * Remove the pipe from the poll set and continue the loop
3152 * since their might be data to consume.
3153 */
3154 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3155 continue;
3156 default:
3157 ERR("Unknown action");
3158 break;
3159 }
03e43155
MD
3160 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3161 DBG("Channel thread pipe hung up");
3162 /*
3163 * Remove the pipe from the poll set and continue the loop
3164 * since their might be data to consume.
3165 */
3166 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3167 continue;
3168 } else {
3169 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3170 goto end;
d8ef542d
MD
3171 }
3172
3173 /* Handle other stream */
3174 continue;
3175 }
3176
3177 rcu_read_lock();
3178 {
3179 uint64_t tmp_id = (uint64_t) pollfd;
3180
3181 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3182 }
3183 node = lttng_ht_iter_get_node_u64(&iter);
3184 assert(node);
3185
3186 chan = caa_container_of(node, struct lttng_consumer_channel,
3187 wait_fd_node);
3188
3189 /* Check for error event */
3190 if (revents & (LPOLLERR | LPOLLHUP)) {
3191 DBG("Channel fd %d is hup|err.", pollfd);
3192
3193 lttng_poll_del(&events, chan->wait_fd);
3194 ret = lttng_ht_del(channel_ht, &iter);
3195 assert(ret == 0);
b4a650f3
DG
3196
3197 /*
3198 * This will close the wait fd for each stream associated to
3199 * this channel AND monitored by the data/metadata thread thus
3200 * will be clean by the right thread.
3201 */
d8ef542d 3202 consumer_close_channel_streams(chan);
f2ad556d
MD
3203
3204 /* Release our own refcount */
3205 if (!uatomic_sub_return(&chan->refcount, 1)
3206 && !uatomic_read(&chan->nb_init_stream_left)) {
3207 consumer_del_channel(chan);
3208 }
03e43155
MD
3209 } else {
3210 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3211 rcu_read_unlock();
3212 goto end;
d8ef542d
MD
3213 }
3214
3215 /* Release RCU lock for the channel looked up */
3216 rcu_read_unlock();
3217 }
3218 }
3219
1fc79fb4
MD
3220 /* All is OK */
3221 err = 0;
d8ef542d
MD
3222end:
3223 lttng_poll_clean(&events);
3224end_poll:
3225 destroy_channel_ht(channel_ht);
3226end_ht:
2d57de81 3227error_testpoint:
d8ef542d 3228 DBG("Channel poll thread exiting");
1fc79fb4
MD
3229 if (err) {
3230 health_error();
3231 ERR("Health error occurred in %s", __func__);
3232 }
3233 health_unregister(health_consumerd);
d8ef542d
MD
3234 rcu_unregister_thread();
3235 return NULL;
3236}
3237
331744e3
JD
3238static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
3239 struct pollfd *sockpoll, int client_socket)
3240{
3241 int ret;
3242
3243 assert(ctx);
3244 assert(sockpoll);
3245
84382d49
MD
3246 ret = lttng_consumer_poll_socket(sockpoll);
3247 if (ret) {
331744e3
JD
3248 goto error;
3249 }
3250 DBG("Metadata connection on client_socket");
3251
3252 /* Blocking call, waiting for transmission */
3253 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3254 if (ctx->consumer_metadata_socket < 0) {
3255 WARN("On accept metadata");
3256 ret = -1;
3257 goto error;
3258 }
3259 ret = 0;
3260
3261error:
3262 return ret;
3263}
3264
3bd1e081
MD
3265/*
3266 * This thread listens on the consumerd socket and receives the file
3267 * descriptors from the session daemon.
3268 */
7d980def 3269void *consumer_thread_sessiond_poll(void *data)
3bd1e081 3270{
1fc79fb4 3271 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
3272 /*
3273 * structure to poll for incoming data on communication socket avoids
3274 * making blocking sockets.
3275 */
3276 struct pollfd consumer_sockpoll[2];
3277 struct lttng_consumer_local_data *ctx = data;
3278
e7b994a3
DG
3279 rcu_register_thread();
3280
1fc79fb4
MD
3281 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3282
2d57de81
MD
3283 if (testpoint(consumerd_thread_sessiond)) {
3284 goto error_testpoint;
3285 }
3286
9ce5646a
MD
3287 health_code_update();
3288
3bd1e081
MD
3289 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3290 unlink(ctx->consumer_command_sock_path);
3291 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3292 if (client_socket < 0) {
3293 ERR("Cannot create command socket");
3294 goto end;
3295 }
3296
3297 ret = lttcomm_listen_unix_sock(client_socket);
3298 if (ret < 0) {
3299 goto end;
3300 }
3301
32258573 3302 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3303 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3304 /* return < 0 on error, but == 0 is not fatal */
3305 if (ret < 0) {
32258573 3306 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3307 goto end;
3308 }
3309
3bd1e081
MD
3310 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3311 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3312 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3313 consumer_sockpoll[1].fd = client_socket;
3314 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3315
84382d49
MD
3316 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3317 if (ret) {
3318 if (ret > 0) {
3319 /* should exit */
3320 err = 0;
3321 }
3bd1e081
MD
3322 goto end;
3323 }
3324 DBG("Connection on client_socket");
3325
3326 /* Blocking call, waiting for transmission */
3327 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3328 if (sock < 0) {
3bd1e081
MD
3329 WARN("On accept");
3330 goto end;
3331 }
3bd1e081 3332
331744e3
JD
3333 /*
3334 * Setup metadata socket which is the second socket connection on the
3335 * command unix socket.
3336 */
3337 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
84382d49
MD
3338 if (ret) {
3339 if (ret > 0) {
3340 /* should exit */
3341 err = 0;
3342 }
331744e3
JD
3343 goto end;
3344 }
3345
d96f09c6
DG
3346 /* This socket is not useful anymore. */
3347 ret = close(client_socket);
3348 if (ret < 0) {
3349 PERROR("close client_socket");
3350 }
3351 client_socket = -1;
3352
3bd1e081
MD
3353 /* update the polling structure to poll on the established socket */
3354 consumer_sockpoll[1].fd = sock;
3355 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3356
3357 while (1) {
9ce5646a
MD
3358 health_code_update();
3359
3360 health_poll_entry();
3361 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3362 health_poll_exit();
84382d49
MD
3363 if (ret) {
3364 if (ret > 0) {
3365 /* should exit */
3366 err = 0;
3367 }
3bd1e081
MD
3368 goto end;
3369 }
3370 DBG("Incoming command on sock");
3371 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
4cbc1a04
DG
3372 if (ret <= 0) {
3373 /*
3374 * This could simply be a session daemon quitting. Don't output
3375 * ERR() here.
3376 */
3377 DBG("Communication interrupted on command socket");
41ba6035 3378 err = 0;
3bd1e081
MD
3379 goto end;
3380 }
10211f5c 3381 if (CMM_LOAD_SHARED(consumer_quit)) {
3bd1e081 3382 DBG("consumer_thread_receive_fds received quit from signal");
1fc79fb4 3383 err = 0; /* All is OK */
3bd1e081
MD
3384 goto end;
3385 }
ffe60014 3386 DBG("received command on sock");
3bd1e081 3387 }
1fc79fb4
MD
3388 /* All is OK */
3389 err = 0;
3390
3bd1e081 3391end:
ffe60014 3392 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3393
d88aee68
DG
3394 /*
3395 * Close metadata streams since the producer is the session daemon which
3396 * just died.
3397 *
3398 * NOTE: for now, this only applies to the UST tracer.
3399 */
6d574024 3400 lttng_consumer_close_all_metadata();
d88aee68 3401
3bd1e081
MD
3402 /*
3403 * when all fds have hung up, the polling thread
3404 * can exit cleanly
3405 */
10211f5c 3406 CMM_STORE_SHARED(consumer_quit, 1);
3bd1e081 3407
04fdd819 3408 /*
c869f647 3409 * Notify the data poll thread to poll back again and test the
8994307f 3410 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3411 */
acdb9057 3412 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3413
a0cbdd2e 3414 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3415
5c635c72
MD
3416 notify_health_quit_pipe(health_quit_pipe);
3417
d96f09c6
DG
3418 /* Cleaning up possibly open sockets. */
3419 if (sock >= 0) {
3420 ret = close(sock);
3421 if (ret < 0) {
3422 PERROR("close sock sessiond poll");
3423 }
3424 }
3425 if (client_socket >= 0) {
38476d24 3426 ret = close(client_socket);
d96f09c6
DG
3427 if (ret < 0) {
3428 PERROR("close client_socket sessiond poll");
3429 }
3430 }
3431
2d57de81 3432error_testpoint:
1fc79fb4
MD
3433 if (err) {
3434 health_error();
3435 ERR("Health error occurred in %s", __func__);
3436 }
3437 health_unregister(health_consumerd);
3438
e7b994a3 3439 rcu_unregister_thread();
3bd1e081
MD
3440 return NULL;
3441}
d41f73b7 3442
4078b776 3443ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
3444 struct lttng_consumer_local_data *ctx)
3445{
74251bb8
DG
3446 ssize_t ret;
3447
d2956687 3448 pthread_mutex_lock(&stream->chan->lock);
74251bb8 3449 pthread_mutex_lock(&stream->lock);
94d49140
JD
3450 if (stream->metadata_flag) {
3451 pthread_mutex_lock(&stream->metadata_rdv_lock);
3452 }
74251bb8 3453
d41f73b7
MD
3454 switch (consumer_data.type) {
3455 case LTTNG_CONSUMER_KERNEL:
d2956687 3456 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
74251bb8 3457 break;
7753dea8
MD
3458 case LTTNG_CONSUMER32_UST:
3459 case LTTNG_CONSUMER64_UST:
d2956687 3460 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
74251bb8 3461 break;
d41f73b7
MD
3462 default:
3463 ERR("Unknown consumer_data type");
3464 assert(0);
74251bb8
DG
3465 ret = -ENOSYS;
3466 break;
d41f73b7 3467 }
74251bb8 3468
94d49140
JD
3469 if (stream->metadata_flag) {
3470 pthread_cond_broadcast(&stream->metadata_rdv);
3471 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3472 }
74251bb8 3473 pthread_mutex_unlock(&stream->lock);
d2956687 3474 pthread_mutex_unlock(&stream->chan->lock);
02d02e31 3475
74251bb8 3476 return ret;
d41f73b7
MD
3477}
3478
3479int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3480{
3481 switch (consumer_data.type) {
3482 case LTTNG_CONSUMER_KERNEL:
3483 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3484 case LTTNG_CONSUMER32_UST:
3485 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3486 return lttng_ustconsumer_on_recv_stream(stream);
3487 default:
3488 ERR("Unknown consumer_data type");
3489 assert(0);
3490 return -ENOSYS;
3491 }
3492}
e4421fec
DG
3493
3494/*
3495 * Allocate and set consumer data hash tables.
3496 */
282dadbc 3497int lttng_consumer_init(void)
e4421fec 3498{
d88aee68 3499 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
282dadbc
MD
3500 if (!consumer_data.channel_ht) {
3501 goto error;
3502 }
3503
5c3892a6
JG
3504 consumer_data.channels_by_session_id_ht =
3505 lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3506 if (!consumer_data.channels_by_session_id_ht) {
3507 goto error;
3508 }
3509
d88aee68 3510 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
282dadbc
MD
3511 if (!consumer_data.relayd_ht) {
3512 goto error;
3513 }
3514
d88aee68 3515 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
282dadbc
MD
3516 if (!consumer_data.stream_list_ht) {
3517 goto error;
3518 }
3519
d8ef542d 3520 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
282dadbc
MD
3521 if (!consumer_data.stream_per_chan_id_ht) {
3522 goto error;
3523 }
3524
3525 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3526 if (!data_ht) {
3527 goto error;
3528 }
3529
3530 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3531 if (!metadata_ht) {
3532 goto error;
3533 }
3534
28cc88f3
JG
3535 consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3536 if (!consumer_data.chunk_registry) {
3537 goto error;
3538 }
3539
282dadbc
MD
3540 return 0;
3541
3542error:
3543 return -1;
e4421fec 3544}
7735ef9e
DG
3545
3546/*
3547 * Process the ADD_RELAYD command receive by a consumer.
3548 *
3549 * This will create a relayd socket pair and add it to the relayd hash table.
3550 * The caller MUST acquire a RCU read side lock before calling it.
3551 */
2527bf85 3552 void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
7735ef9e 3553 struct lttng_consumer_local_data *ctx, int sock,
6151a90f 3554 struct pollfd *consumer_sockpoll,
d3e2ba59
JD
3555 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3556 uint64_t relayd_session_id)
7735ef9e 3557{
cd2b09ed 3558 int fd = -1, ret = -1, relayd_created = 0;
0c759fc9 3559 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
d4298c99 3560 struct consumer_relayd_sock_pair *relayd = NULL;
7735ef9e 3561
6151a90f
JD
3562 assert(ctx);
3563 assert(relayd_sock);
3564
da009f2c 3565 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3566
3567 /* Get relayd reference if exists. */
3568 relayd = consumer_find_relayd(net_seq_idx);
3569 if (relayd == NULL) {
da009f2c 3570 assert(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3571 /* Not found. Allocate one. */
3572 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3573 if (relayd == NULL) {
618a6a28
MD
3574 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3575 goto error;
0d08d75e 3576 } else {
30319bcb 3577 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3578 relayd_created = 1;
7735ef9e 3579 }
0d08d75e
DG
3580
3581 /*
3582 * This code path MUST continue to the consumer send status message to
3583 * we can notify the session daemon and continue our work without
3584 * killing everything.
3585 */
da009f2c
MD
3586 } else {
3587 /*
3588 * relayd key should never be found for control socket.
3589 */
3590 assert(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3591 }
3592
3593 /* First send a status message before receiving the fds. */
0c759fc9 3594 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
618a6a28 3595 if (ret < 0) {
0d08d75e 3596 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3597 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3598 goto error_nosignal;
7735ef9e
DG
3599 }
3600
3601 /* Poll on consumer socket. */
84382d49
MD
3602 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3603 if (ret) {
3604 /* Needing to exit in the middle of a command: error. */
0d08d75e 3605 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
618a6a28 3606 goto error_nosignal;
7735ef9e
DG
3607 }
3608
3609 /* Get relayd socket from session daemon */
3610 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3611 if (ret != sizeof(fd)) {
4028eeb9 3612 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3613
3614 /*
3615 * Failing to receive FDs might indicate a major problem such as
3616 * reaching a fd limit during the receive where the kernel returns a
3617 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3618 * don't take any chances and stop everything.
3619 *
3620 * XXX: Feature request #558 will fix that and avoid this possible
3621 * issue when reaching the fd limit.
3622 */
3623 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3624 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3625 goto error;
3626 }
3627
7735ef9e
DG
3628 /* Copy socket information and received FD */
3629 switch (sock_type) {
3630 case LTTNG_STREAM_CONTROL:
3631 /* Copy received lttcomm socket */
6151a90f
JD
3632 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3633 ret = lttcomm_create_sock(&relayd->control_sock.sock);
4028eeb9 3634 /* Handle create_sock error. */
f66c074c 3635 if (ret < 0) {
618a6a28 3636 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3637 goto error;
f66c074c 3638 }
da009f2c
MD
3639 /*
3640 * Close the socket created internally by
3641 * lttcomm_create_sock, so we can replace it by the one
3642 * received from sessiond.
3643 */
3644 if (close(relayd->control_sock.sock.fd)) {
3645 PERROR("close");
3646 }
7735ef9e
DG
3647
3648 /* Assign new file descriptor */
6151a90f
JD
3649 relayd->control_sock.sock.fd = fd;
3650 /* Assign version values. */
3651 relayd->control_sock.major = relayd_sock->major;
3652 relayd->control_sock.minor = relayd_sock->minor;
c5b6f4f0 3653
d3e2ba59 3654 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3655
7735ef9e
DG
3656 break;
3657 case LTTNG_STREAM_DATA:
3658 /* Copy received lttcomm socket */
6151a90f
JD
3659 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3660 ret = lttcomm_create_sock(&relayd->data_sock.sock);
4028eeb9 3661 /* Handle create_sock error. */
f66c074c 3662 if (ret < 0) {
618a6a28 3663 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3664 goto error;
f66c074c 3665 }
da009f2c
MD
3666 /*
3667 * Close the socket created internally by
3668 * lttcomm_create_sock, so we can replace it by the one
3669 * received from sessiond.
3670 */
3671 if (close(relayd->data_sock.sock.fd)) {
3672 PERROR("close");
3673 }
7735ef9e
DG
3674
3675 /* Assign new file descriptor */
6151a90f
JD
3676 relayd->data_sock.sock.fd = fd;
3677 /* Assign version values. */
3678 relayd->data_sock.major = relayd_sock->major;
3679 relayd->data_sock.minor = relayd_sock->minor;
7735ef9e
DG
3680 break;
3681 default:
3682 ERR("Unknown relayd socket type (%d)", sock_type);
618a6a28 3683 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3684 goto error;
3685 }
3686
d88aee68 3687 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
7735ef9e
DG
3688 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3689 relayd->net_seq_idx, fd);
39d9954c
FD
3690 /*
3691 * We gave the ownership of the fd to the relayd structure. Set the
3692 * fd to -1 so we don't call close() on it in the error path below.
3693 */
3694 fd = -1;
7735ef9e 3695
618a6a28
MD
3696 /* We successfully added the socket. Send status back. */
3697 ret = consumer_send_status_msg(sock, ret_code);
3698 if (ret < 0) {
3699 /* Somehow, the session daemon is not responding anymore. */
3700 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3701 goto error_nosignal;
3702 }
3703
7735ef9e
DG
3704 /*
3705 * Add relayd socket pair to consumer data hashtable. If object already
3706 * exists or on error, the function gracefully returns.
3707 */
9276e5c8 3708 relayd->ctx = ctx;
d09e1200 3709 add_relayd(relayd);
7735ef9e
DG
3710
3711 /* All good! */
2527bf85 3712 return;
7735ef9e
DG
3713
3714error:
618a6a28
MD
3715 if (consumer_send_status_msg(sock, ret_code) < 0) {
3716 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3717 }
3718
3719error_nosignal:
4028eeb9
DG
3720 /* Close received socket if valid. */
3721 if (fd >= 0) {
3722 if (close(fd)) {
3723 PERROR("close received socket");
3724 }
3725 }
cd2b09ed
DG
3726
3727 if (relayd_created) {
cd2b09ed
DG
3728 free(relayd);
3729 }
7735ef9e 3730}
ca22feea 3731
f7079f67
DG
3732/*
3733 * Search for a relayd associated to the session id and return the reference.
3734 *
3735 * A rcu read side lock MUST be acquire before calling this function and locked
3736 * until the relayd object is no longer necessary.
3737 */
3738static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3739{
3740 struct lttng_ht_iter iter;
f7079f67 3741 struct consumer_relayd_sock_pair *relayd = NULL;
f7079f67
DG
3742
3743 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3744 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3745 node.node) {
18261bd1
DG
3746 /*
3747 * Check by sessiond id which is unique here where the relayd session
3748 * id might not be when having multiple relayd.
3749 */
3750 if (relayd->sessiond_session_id == id) {
f7079f67 3751 /* Found the relayd. There can be only one per id. */
18261bd1 3752 goto found;
f7079f67
DG
3753 }
3754 }
3755
18261bd1
DG
3756 return NULL;
3757
3758found:
f7079f67
DG
3759 return relayd;
3760}
3761
ca22feea
DG
3762/*
3763 * Check if for a given session id there is still data needed to be extract
3764 * from the buffers.
3765 *
6d805429 3766 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3767 */
6d805429 3768int consumer_data_pending(uint64_t id)
ca22feea
DG
3769{
3770 int ret;
3771 struct lttng_ht_iter iter;
3772 struct lttng_ht *ht;
3773 struct lttng_consumer_stream *stream;
f7079f67 3774 struct consumer_relayd_sock_pair *relayd = NULL;
6d805429 3775 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3776
6d805429 3777 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3778
6f6eda74 3779 rcu_read_lock();
ca22feea
DG
3780 pthread_mutex_lock(&consumer_data.lock);
3781
3782 switch (consumer_data.type) {
3783 case LTTNG_CONSUMER_KERNEL:
6d805429 3784 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3785 break;
3786 case LTTNG_CONSUMER32_UST:
3787 case LTTNG_CONSUMER64_UST:
6d805429 3788 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3789 break;
3790 default:
3791 ERR("Unknown consumer data type");
3792 assert(0);
3793 }
3794
3795 /* Ease our life a bit */
3796 ht = consumer_data.stream_list_ht;
3797
c8f59ee5 3798 cds_lfht_for_each_entry_duplicate(ht->ht,
d88aee68
DG
3799 ht->hash_fct(&id, lttng_ht_seed),
3800 ht->match_fct, &id,
ca22feea 3801 &iter.iter, stream, node_session_id.node) {
bb586a6e 3802 pthread_mutex_lock(&stream->lock);
ca22feea 3803
4e9a4686
DG
3804 /*
3805 * A removed node from the hash table indicates that the stream has
3806 * been deleted thus having a guarantee that the buffers are closed
3807 * on the consumer side. However, data can still be transmitted
3808 * over the network so don't skip the relayd check.
3809 */
3810 ret = cds_lfht_is_node_deleted(&stream->node.node);
3811 if (!ret) {
3812 /* Check the stream if there is data in the buffers. */
6d805429
DG
3813 ret = data_pending(stream);
3814 if (ret == 1) {
4e9a4686 3815 pthread_mutex_unlock(&stream->lock);
f7079f67 3816 goto data_pending;
4e9a4686
DG
3817 }
3818 }
3819
d9f0c7c7
JR
3820 pthread_mutex_unlock(&stream->lock);
3821 }
3822
3823 relayd = find_relayd_by_session_id(id);
3824 if (relayd) {
3825 unsigned int is_data_inflight = 0;
3826
3827 /* Send init command for data pending. */
3828 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3829 ret = relayd_begin_data_pending(&relayd->control_sock,
3830 relayd->relayd_session_id);
3831 if (ret < 0) {
3832 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3833 /* Communication error thus the relayd so no data pending. */
3834 goto data_not_pending;
3835 }
3836
3837 cds_lfht_for_each_entry_duplicate(ht->ht,
3838 ht->hash_fct(&id, lttng_ht_seed),
3839 ht->match_fct, &id,
3840 &iter.iter, stream, node_session_id.node) {
c8f59ee5 3841 if (stream->metadata_flag) {
ad7051c0
DG
3842 ret = relayd_quiescent_control(&relayd->control_sock,
3843 stream->relayd_stream_id);
c8f59ee5 3844 } else {
6d805429 3845 ret = relayd_data_pending(&relayd->control_sock,
39df6d9f
DG
3846 stream->relayd_stream_id,
3847 stream->next_net_seq_num - 1);
c8f59ee5 3848 }
d9f0c7c7
JR
3849
3850 if (ret == 1) {
3851 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3852 goto data_pending;
3853 } else if (ret < 0) {
9276e5c8
JR
3854 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
3855 lttng_consumer_cleanup_relayd(relayd);
3856 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
9276e5c8
JR
3857 goto data_not_pending;
3858 }
c8f59ee5 3859 }
f7079f67 3860
d9f0c7c7 3861 /* Send end command for data pending. */
f7079f67
DG
3862 ret = relayd_end_data_pending(&relayd->control_sock,
3863 relayd->relayd_session_id, &is_data_inflight);
3864 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3865 if (ret < 0) {
9276e5c8
JR
3866 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
3867 lttng_consumer_cleanup_relayd(relayd);
f7079f67
DG
3868 goto data_not_pending;
3869 }
bdd88757
DG
3870 if (is_data_inflight) {
3871 goto data_pending;
3872 }
f7079f67
DG
3873 }
3874
ca22feea 3875 /*
f7079f67
DG
3876 * Finding _no_ node in the hash table and no inflight data means that the
3877 * stream(s) have been removed thus data is guaranteed to be available for
3878 * analysis from the trace files.
ca22feea
DG
3879 */
3880
f7079f67 3881data_not_pending:
ca22feea
DG
3882 /* Data is available to be read by a viewer. */
3883 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3884 rcu_read_unlock();
6d805429 3885 return 0;
ca22feea 3886
f7079f67 3887data_pending:
ca22feea
DG
3888 /* Data is still being extracted from buffers. */
3889 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3890 rcu_read_unlock();
6d805429 3891 return 1;
ca22feea 3892}
f50f23d9
DG
3893
3894/*
3895 * Send a ret code status message to the sessiond daemon.
3896 *
3897 * Return the sendmsg() return value.
3898 */
3899int consumer_send_status_msg(int sock, int ret_code)
3900{
3901 struct lttcomm_consumer_status_msg msg;
3902
53efb85a 3903 memset(&msg, 0, sizeof(msg));
f50f23d9
DG
3904 msg.ret_code = ret_code;
3905
3906 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3907}
ffe60014
DG
3908
3909/*
3910 * Send a channel status message to the sessiond daemon.
3911 *
3912 * Return the sendmsg() return value.
3913 */
3914int consumer_send_status_channel(int sock,
3915 struct lttng_consumer_channel *channel)
3916{
3917 struct lttcomm_consumer_status_channel msg;
3918
3919 assert(sock >= 0);
3920
53efb85a 3921 memset(&msg, 0, sizeof(msg));
ffe60014 3922 if (!channel) {
0c759fc9 3923 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
ffe60014 3924 } else {
0c759fc9 3925 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ffe60014
DG
3926 msg.key = channel->key;
3927 msg.stream_count = channel->streams.count;
3928 }
3929
3930 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3931}
5c786ded 3932
d07ceecd
MD
3933unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
3934 unsigned long produced_pos, uint64_t nb_packets_per_stream,
3935 uint64_t max_sb_size)
5c786ded 3936{
d07ceecd 3937 unsigned long start_pos;
5c786ded 3938
d07ceecd
MD
3939 if (!nb_packets_per_stream) {
3940 return consumed_pos; /* Grab everything */
3941 }
3942 start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
3943 start_pos -= max_sb_size * nb_packets_per_stream;
3944 if ((long) (start_pos - consumed_pos) < 0) {
3945 return consumed_pos; /* Grab everything */
3946 }
3947 return start_pos;
5c786ded 3948}
a1ae2ea5 3949
b99a8d42
JD
3950static
3951int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
3952{
3953 int ret = 0;
3954
3955 switch (consumer_data.type) {
3956 case LTTNG_CONSUMER_KERNEL:
5416a504
MD
3957 if (producer_active) {
3958 ret = kernctl_buffer_flush(stream->wait_fd);
3959 if (ret < 0) {
3960 ERR("Failed to flush kernel stream");
3961 goto end;
3962 }
3963 } else {
3964 ret = kernctl_buffer_flush_empty(stream->wait_fd);
3965 if (ret < 0) {
3966 ERR("Failed to flush kernel stream");
3967 goto end;
3968 }
b99a8d42
JD
3969 }
3970 break;
3971 case LTTNG_CONSUMER32_UST:
3972 case LTTNG_CONSUMER64_UST:
5416a504 3973 lttng_ustconsumer_flush_buffer(stream, producer_active);
b99a8d42
JD
3974 break;
3975 default:
3976 ERR("Unknown consumer_data type");
3977 abort();
3978 }
3979
3980end:
3981 return ret;
3982}
3983
3984/*
3985 * Sample the rotate position for all the streams of a channel. If a stream
3986 * is already at the rotate position (produced == consumed), we flag it as
3987 * ready for rotation. The rotation of ready streams occurs after we have
3988 * replied to the session daemon that we have finished sampling the positions.
92b7a7f8 3989 * Must be called with RCU read-side lock held to ensure existence of channel.
b99a8d42
JD
3990 *
3991 * Returns 0 on success, < 0 on error
3992 */
92b7a7f8 3993int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
d2956687 3994 uint64_t key, uint64_t relayd_id, uint32_t metadata,
b99a8d42
JD
3995 struct lttng_consumer_local_data *ctx)
3996{
3997 int ret;
b99a8d42
JD
3998 struct lttng_consumer_stream *stream;
3999 struct lttng_ht_iter iter;
4000 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
c35f9726
JG
4001 struct lttng_dynamic_array stream_rotation_positions;
4002 uint64_t next_chunk_id, stream_count = 0;
4003 enum lttng_trace_chunk_status chunk_status;
4004 const bool is_local_trace = relayd_id == -1ULL;
4005 struct consumer_relayd_sock_pair *relayd = NULL;
4006 bool rotating_to_new_chunk = true;
b99a8d42
JD
4007
4008 DBG("Consumer sample rotate position for channel %" PRIu64, key);
4009
c35f9726
JG
4010 lttng_dynamic_array_init(&stream_rotation_positions,
4011 sizeof(struct relayd_stream_rotation_position), NULL);
4012
b99a8d42
JD
4013 rcu_read_lock();
4014
b99a8d42 4015 pthread_mutex_lock(&channel->lock);
c35f9726
JG
4016 assert(channel->trace_chunk);
4017 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
4018 &next_chunk_id);
4019 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4020 ret = -1;
4021 goto end_unlock_channel;
4022 }
b99a8d42
JD
4023
4024 cds_lfht_for_each_entry_duplicate(ht->ht,
4025 ht->hash_fct(&channel->key, lttng_ht_seed),
4026 ht->match_fct, &channel->key, &iter.iter,
4027 stream, node_channel_id.node) {
a40a503f 4028 unsigned long produced_pos = 0, consumed_pos = 0;
b99a8d42
JD
4029
4030 health_code_update();
4031
4032 /*
4033 * Lock stream because we are about to change its state.
4034 */
4035 pthread_mutex_lock(&stream->lock);
4036
c35f9726
JG
4037 if (stream->trace_chunk == stream->chan->trace_chunk) {
4038 rotating_to_new_chunk = false;
4039 }
4040
a40a503f 4041 /*
a9dde553
MD
4042 * Do not flush an empty packet when rotating from a NULL trace
4043 * chunk. The stream has no means to output data, and the prior
4044 * rotation which rotated to NULL performed that side-effect already.
a40a503f 4045 */
a9dde553
MD
4046 if (stream->trace_chunk) {
4047 /*
4048 * For metadata stream, do an active flush, which does not
4049 * produce empty packets. For data streams, empty-flush;
4050 * ensures we have at least one packet in each stream per trace
4051 * chunk, even if no data was produced.
4052 */
4053 ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
4054 if (ret < 0) {
4055 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
4056 stream->key);
4057 goto end_unlock_stream;
4058 }
b99a8d42
JD
4059 }
4060
a40a503f
MD
4061 ret = lttng_consumer_take_snapshot(stream);
4062 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4063 ERR("Failed to sample snapshot position during channel rotation");
b99a8d42
JD
4064 goto end_unlock_stream;
4065 }
a40a503f
MD
4066 if (!ret) {
4067 ret = lttng_consumer_get_produced_snapshot(stream,
4068 &produced_pos);
4069 if (ret < 0) {
4070 ERR("Failed to sample produced position during channel rotation");
4071 goto end_unlock_stream;
4072 }
b99a8d42 4073
a40a503f
MD
4074 ret = lttng_consumer_get_consumed_snapshot(stream,
4075 &consumed_pos);
4076 if (ret < 0) {
4077 ERR("Failed to sample consumed position during channel rotation");
4078 goto end_unlock_stream;
4079 }
4080 }
4081 /*
4082 * Align produced position on the start-of-packet boundary of the first
4083 * packet going into the next trace chunk.
4084 */
4085 produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
4086 if (consumed_pos == produced_pos) {
f8528c7a
MD
4087 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
4088 stream->key, produced_pos, consumed_pos);
b99a8d42 4089 stream->rotate_ready = true;
f8528c7a
MD
4090 } else {
4091 DBG("Different consumed and produced positions "
4092 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4093 stream->key, produced_pos, consumed_pos);
b99a8d42 4094 }
633d0182 4095 /*
a40a503f
MD
4096 * The rotation position is based on the packet_seq_num of the
4097 * packet following the last packet that was consumed for this
4098 * stream, incremented by the offset between produced and
4099 * consumed positions. This rotation position is a lower bound
4100 * (inclusive) at which the next trace chunk starts. Since it
4101 * is a lower bound, it is OK if the packet_seq_num does not
4102 * correspond exactly to the same packet identified by the
4103 * consumed_pos, which can happen in overwrite mode.
633d0182 4104 */
a40a503f
MD
4105 if (stream->sequence_number_unavailable) {
4106 /*
4107 * Rotation should never be performed on a session which
4108 * interacts with a pre-2.8 lttng-modules, which does
4109 * not implement packet sequence number.
4110 */
4111 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
b99a8d42 4112 stream->key);
a40a503f 4113 ret = -1;
b99a8d42
JD
4114 goto end_unlock_stream;
4115 }
a40a503f
MD
4116 stream->rotate_position = stream->last_sequence_number + 1 +
4117 ((produced_pos - consumed_pos) / stream->max_sb_size);
f8528c7a
MD
4118 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
4119 stream->key, stream->rotate_position);
b99a8d42 4120
c35f9726 4121 if (!is_local_trace) {
633d0182
JG
4122 /*
4123 * The relay daemon control protocol expects a rotation
4124 * position as "the sequence number of the first packet
a40a503f 4125 * _after_ the current trace chunk".
633d0182 4126 */
c35f9726
JG
4127 const struct relayd_stream_rotation_position position = {
4128 .stream_id = stream->relayd_stream_id,
a40a503f 4129 .rotate_at_seq_num = stream->rotate_position,
c35f9726
JG
4130 };
4131
4132 ret = lttng_dynamic_array_add_element(
4133 &stream_rotation_positions,
4134 &position);
4135 if (ret) {
4136 ERR("Failed to allocate stream rotation position");
4137 goto end_unlock_stream;
4138 }
4139 stream_count++;
4140 }
b99a8d42
JD
4141 pthread_mutex_unlock(&stream->lock);
4142 }
c35f9726 4143 stream = NULL;
b99a8d42
JD
4144 pthread_mutex_unlock(&channel->lock);
4145
c35f9726
JG
4146 if (is_local_trace) {
4147 ret = 0;
4148 goto end;
4149 }
4150
4151 relayd = consumer_find_relayd(relayd_id);
4152 if (!relayd) {
4153 ERR("Failed to find relayd %" PRIu64, relayd_id);
4154 ret = -1;
4155 goto end;
4156 }
4157
4158 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4159 ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
4160 rotating_to_new_chunk ? &next_chunk_id : NULL,
4161 (const struct relayd_stream_rotation_position *)
4162 stream_rotation_positions.buffer.data);
4163 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4164 if (ret < 0) {
4165 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
4166 relayd->net_seq_idx);
4167 lttng_consumer_cleanup_relayd(relayd);
4168 goto end;
4169 }
4170
b99a8d42
JD
4171 ret = 0;
4172 goto end;
4173
4174end_unlock_stream:
4175 pthread_mutex_unlock(&stream->lock);
c35f9726 4176end_unlock_channel:
b99a8d42
JD
4177 pthread_mutex_unlock(&channel->lock);
4178end:
4179 rcu_read_unlock();
c35f9726 4180 lttng_dynamic_array_reset(&stream_rotation_positions);
b99a8d42
JD
4181 return ret;
4182}
4183
5f3aff8b
MD
4184static
4185int consumer_clear_buffer(struct lttng_consumer_stream *stream)
4186{
4187 int ret = 0;
4188 unsigned long consumed_pos_before, consumed_pos_after;
4189
4190 ret = lttng_consumer_sample_snapshot_positions(stream);
4191 if (ret < 0) {
4192 ERR("Taking snapshot positions");
4193 goto end;
4194 }
4195
4196 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4197 if (ret < 0) {
4198 ERR("Consumed snapshot position");
4199 goto end;
4200 }
4201
4202 switch (consumer_data.type) {
4203 case LTTNG_CONSUMER_KERNEL:
4204 ret = kernctl_buffer_clear(stream->wait_fd);
4205 if (ret < 0) {
4206 ERR("Failed to flush kernel stream");
4207 goto end;
4208 }
4209 break;
4210 case LTTNG_CONSUMER32_UST:
4211 case LTTNG_CONSUMER64_UST:
4212 lttng_ustconsumer_clear_buffer(stream);
4213 break;
4214 default:
4215 ERR("Unknown consumer_data type");
4216 abort();
4217 }
4218
4219 ret = lttng_consumer_sample_snapshot_positions(stream);
4220 if (ret < 0) {
4221 ERR("Taking snapshot positions");
4222 goto end;
4223 }
4224 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4225 if (ret < 0) {
4226 ERR("Consumed snapshot position");
4227 goto end;
4228 }
4229 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4230end:
4231 return ret;
4232}
4233
4234static
4235int consumer_clear_stream(struct lttng_consumer_stream *stream)
4236{
4237 int ret;
4238
4239 ret = consumer_flush_buffer(stream, 1);
4240 if (ret < 0) {
4241 ERR("Failed to flush stream %" PRIu64 " during channel clear",
4242 stream->key);
4243 ret = LTTCOMM_CONSUMERD_FATAL;
4244 goto error;
4245 }
4246
4247 ret = consumer_clear_buffer(stream);
4248 if (ret < 0) {
4249 ERR("Failed to clear stream %" PRIu64 " during channel clear",
4250 stream->key);
4251 ret = LTTCOMM_CONSUMERD_FATAL;
4252 goto error;
4253 }
4254
4255 ret = LTTCOMM_CONSUMERD_SUCCESS;
4256error:
4257 return ret;
4258}
4259
4260static
4261int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
4262{
4263 int ret;
4264 struct lttng_consumer_stream *stream;
4265
4266 rcu_read_lock();
4267 pthread_mutex_lock(&channel->lock);
4268 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
4269 health_code_update();
4270 pthread_mutex_lock(&stream->lock);
4271 ret = consumer_clear_stream(stream);
4272 if (ret) {
4273 goto error_unlock;
4274 }
4275 pthread_mutex_unlock(&stream->lock);
4276 }
4277 pthread_mutex_unlock(&channel->lock);
4278 rcu_read_unlock();
4279 return 0;
4280
4281error_unlock:
4282 pthread_mutex_unlock(&stream->lock);
4283 pthread_mutex_unlock(&channel->lock);
4284 rcu_read_unlock();
4285 if (ret) {
4286 goto error;
4287 }
4288 ret = LTTCOMM_CONSUMERD_SUCCESS;
4289error:
4290 return ret;
4291}
4292
02d02e31
JD
4293/*
4294 * Check if a stream is ready to be rotated after extracting it.
4295 *
4296 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4297 * error. Stream lock must be held.
4298 */
4299int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4300{
f8528c7a
MD
4301 DBG("Check is rotate ready for stream %" PRIu64
4302 " ready %u rotate_position %" PRIu64
4303 " last_sequence_number %" PRIu64,
4304 stream->key, stream->rotate_ready,
4305 stream->rotate_position, stream->last_sequence_number);
02d02e31 4306 if (stream->rotate_ready) {
a40a503f 4307 return 1;
02d02e31
JD
4308 }
4309
4310 /*
a40a503f
MD
4311 * If packet seq num is unavailable, it means we are interacting
4312 * with a pre-2.8 lttng-modules which does not implement the
4313 * sequence number. Rotation should never be used by sessiond in this
4314 * scenario.
02d02e31 4315 */
a40a503f
MD
4316 if (stream->sequence_number_unavailable) {
4317 ERR("Internal error: rotation used on stream %" PRIu64
4318 " with unavailable sequence number",
4319 stream->key);
4320 return -1;
02d02e31
JD
4321 }
4322
a40a503f
MD
4323 if (stream->rotate_position == -1ULL ||
4324 stream->last_sequence_number == -1ULL) {
4325 return 0;
02d02e31
JD
4326 }
4327
a40a503f
MD
4328 /*
4329 * Rotate position not reached yet. The stream rotate position is
4330 * the position of the next packet belonging to the next trace chunk,
4331 * but consumerd considers rotation ready when reaching the last
4332 * packet of the current chunk, hence the "rotate_position - 1".
4333 */
f8528c7a
MD
4334
4335 DBG("Check is rotate ready for stream %" PRIu64
4336 " last_sequence_number %" PRIu64
4337 " rotate_position %" PRIu64,
4338 stream->key, stream->last_sequence_number,
4339 stream->rotate_position);
a40a503f
MD
4340 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4341 return 1;
02d02e31 4342 }
02d02e31 4343
a40a503f 4344 return 0;
02d02e31
JD
4345}
4346
d73bf3d7
JD
4347/*
4348 * Reset the state for a stream after a rotation occurred.
4349 */
4350void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4351{
f8528c7a
MD
4352 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
4353 stream->key);
a40a503f 4354 stream->rotate_position = -1ULL;
d73bf3d7
JD
4355 stream->rotate_ready = false;
4356}
4357
4358/*
4359 * Perform the rotation a local stream file.
4360 */
d2956687 4361static
d73bf3d7
JD
4362int rotate_local_stream(struct lttng_consumer_local_data *ctx,
4363 struct lttng_consumer_stream *stream)
4364{
d2956687 4365 int ret = 0;
d73bf3d7 4366
d2956687 4367 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
d73bf3d7 4368 stream->key,
d2956687 4369 stream->chan->key);
d73bf3d7 4370 stream->tracefile_size_current = 0;
d2956687 4371 stream->tracefile_count_current = 0;
d73bf3d7 4372
d2956687
JG
4373 if (stream->out_fd >= 0) {
4374 ret = close(stream->out_fd);
4375 if (ret) {
4376 PERROR("Failed to close stream out_fd of channel \"%s\"",
4377 stream->chan->name);
4378 }
4379 stream->out_fd = -1;
4380 }
d73bf3d7 4381
d2956687 4382 if (stream->index_file) {
d73bf3d7 4383 lttng_index_file_put(stream->index_file);
d2956687 4384 stream->index_file = NULL;
d73bf3d7
JD
4385 }
4386
d2956687
JG
4387 if (!stream->trace_chunk) {
4388 goto end;
4389 }
d73bf3d7 4390
d2956687 4391 ret = consumer_stream_create_output_files(stream, true);
d73bf3d7
JD
4392end:
4393 return ret;
d73bf3d7
JD
4394}
4395
d73bf3d7
JD
4396/*
4397 * Performs the stream rotation for the rotate session feature if needed.
d2956687 4398 * It must be called with the channel and stream locks held.
d73bf3d7
JD
4399 *
4400 * Return 0 on success, a negative number of error.
4401 */
4402int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
d2956687 4403 struct lttng_consumer_stream *stream)
d73bf3d7
JD
4404{
4405 int ret;
4406
4407 DBG("Consumer rotate stream %" PRIu64, stream->key);
4408
d2956687
JG
4409 /*
4410 * Update the stream's 'current' chunk to the session's (channel)
4411 * now-current chunk.
4412 */
4413 lttng_trace_chunk_put(stream->trace_chunk);
4414 if (stream->chan->trace_chunk == stream->trace_chunk) {
4415 /*
4416 * A channel can be rotated and not have a "next" chunk
4417 * to transition to. In that case, the channel's "current chunk"
4418 * has not been closed yet, but it has not been updated to
4419 * a "next" trace chunk either. Hence, the stream, like its
4420 * parent channel, becomes part of no chunk and can't output
4421 * anything until a new trace chunk is created.
4422 */
4423 stream->trace_chunk = NULL;
4424 } else if (stream->chan->trace_chunk &&
4425 !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
4426 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4427 ret = -1;
4428 goto error;
4429 } else {
4430 /*
4431 * Update the stream's trace chunk to its parent channel's
4432 * current trace chunk.
4433 */
4434 stream->trace_chunk = stream->chan->trace_chunk;
4435 }
4436
c35f9726 4437 if (stream->net_seq_idx == (uint64_t) -1ULL) {
d73bf3d7 4438 ret = rotate_local_stream(ctx, stream);
c35f9726
JG
4439 if (ret < 0) {
4440 ERR("Failed to rotate stream, ret = %i", ret);
4441 goto error;
4442 }
d73bf3d7
JD
4443 }
4444
d2956687
JG
4445 if (stream->metadata_flag && stream->trace_chunk) {
4446 /*
4447 * If the stream has transitioned to a new trace
4448 * chunk, the metadata should be re-dumped to the
4449 * newest chunk.
4450 *
4451 * However, it is possible for a stream to transition to
4452 * a "no-chunk" state. This can happen if a rotation
4453 * occurs on an inactive session. In such cases, the metadata
4454 * regeneration will happen when the next trace chunk is
4455 * created.
4456 */
4457 ret = consumer_metadata_stream_dump(stream);
4458 if (ret) {
4459 goto error;
d73bf3d7
JD
4460 }
4461 }
4462 lttng_consumer_reset_stream_rotate_state(stream);
4463
4464 ret = 0;
4465
4466error:
4467 return ret;
4468}
4469
b99a8d42
JD
4470/*
4471 * Rotate all the ready streams now.
4472 *
4473 * This is especially important for low throughput streams that have already
4474 * been consumed, we cannot wait for their next packet to perform the
4475 * rotation.
92b7a7f8
MD
4476 * Need to be called with RCU read-side lock held to ensure existence of
4477 * channel.
b99a8d42
JD
4478 *
4479 * Returns 0 on success, < 0 on error
4480 */
92b7a7f8
MD
4481int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
4482 uint64_t key, struct lttng_consumer_local_data *ctx)
b99a8d42
JD
4483{
4484 int ret;
b99a8d42
JD
4485 struct lttng_consumer_stream *stream;
4486 struct lttng_ht_iter iter;
4487 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
4488
4489 rcu_read_lock();
4490
4491 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4492
b99a8d42
JD
4493 cds_lfht_for_each_entry_duplicate(ht->ht,
4494 ht->hash_fct(&channel->key, lttng_ht_seed),
4495 ht->match_fct, &channel->key, &iter.iter,
4496 stream, node_channel_id.node) {
4497 health_code_update();
4498
d2956687 4499 pthread_mutex_lock(&stream->chan->lock);
b99a8d42
JD
4500 pthread_mutex_lock(&stream->lock);
4501
4502 if (!stream->rotate_ready) {
4503 pthread_mutex_unlock(&stream->lock);
d2956687 4504 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4505 continue;
4506 }
4507 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4508
d2956687 4509 ret = lttng_consumer_rotate_stream(ctx, stream);
b99a8d42 4510 pthread_mutex_unlock(&stream->lock);
d2956687 4511 pthread_mutex_unlock(&stream->chan->lock);
b99a8d42
JD
4512 if (ret) {
4513 goto end;
4514 }
4515 }
4516
4517 ret = 0;
4518
4519end:
4520 rcu_read_unlock();
4521 return ret;
4522}
4523
d2956687
JG
4524enum lttcomm_return_code lttng_consumer_init_command(
4525 struct lttng_consumer_local_data *ctx,
4526 const lttng_uuid sessiond_uuid)
00fb02ac 4527{
d2956687 4528 enum lttcomm_return_code ret;
c70636a7 4529 char uuid_str[LTTNG_UUID_STR_LEN];
00fb02ac 4530
d2956687
JG
4531 if (ctx->sessiond_uuid.is_set) {
4532 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
00fb02ac
JD
4533 goto end;
4534 }
4535
d2956687
JG
4536 ctx->sessiond_uuid.is_set = true;
4537 memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
4538 ret = LTTCOMM_CONSUMERD_SUCCESS;
4539 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4540 DBG("Received session daemon UUID: %s", uuid_str);
00fb02ac
JD
4541end:
4542 return ret;
4543}
4544
d2956687
JG
4545enum lttcomm_return_code lttng_consumer_create_trace_chunk(
4546 const uint64_t *relayd_id, uint64_t session_id,
4547 uint64_t chunk_id,
4548 time_t chunk_creation_timestamp,
4549 const char *chunk_override_name,
4550 const struct lttng_credentials *credentials,
4551 struct lttng_directory_handle *chunk_directory_handle)
00fb02ac
JD
4552{
4553 int ret;
d2956687 4554 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
7ea24db3 4555 struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
d2956687
JG
4556 enum lttng_trace_chunk_status chunk_status;
4557 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4558 char creation_timestamp_buffer[ISO8601_STR_LEN];
4559 const char *relayd_id_str = "(none)";
4560 const char *creation_timestamp_str;
4561 struct lttng_ht_iter iter;
4562 struct lttng_consumer_channel *channel;
92816cc3 4563
d2956687
JG
4564 if (relayd_id) {
4565 /* Only used for logging purposes. */
4566 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4567 "%" PRIu64, *relayd_id);
4568 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4569 relayd_id_str = relayd_id_buffer;
4570 } else {
4571 relayd_id_str = "(formatting error)";
4572 }
4573 }
4574
4575 /* Local protocol error. */
4576 assert(chunk_creation_timestamp);
4577 ret = time_to_iso8601_str(chunk_creation_timestamp,
4578 creation_timestamp_buffer,
4579 sizeof(creation_timestamp_buffer));
4580 creation_timestamp_str = !ret ? creation_timestamp_buffer :
4581 "(formatting error)";
4582
4583 DBG("Consumer create trace chunk command: relay_id = %s"
4584 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
4585 ", chunk_override_name = %s"
4586 ", chunk_creation_timestamp = %s",
4587 relayd_id_str, session_id, chunk_id,
4588 chunk_override_name ? : "(none)",
4589 creation_timestamp_str);
92816cc3
JG
4590
4591 /*
d2956687
JG
4592 * The trace chunk registry, as used by the consumer daemon, implicitly
4593 * owns the trace chunks. This is only needed in the consumer since
4594 * the consumer has no notion of a session beyond session IDs being
4595 * used to identify other objects.
4596 *
4597 * The lttng_trace_chunk_registry_publish() call below provides a
4598 * reference which is not released; it implicitly becomes the session
4599 * daemon's reference to the chunk in the consumer daemon.
4600 *
4601 * The lifetime of trace chunks in the consumer daemon is managed by
4602 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4603 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
92816cc3 4604 */
d2956687 4605 created_chunk = lttng_trace_chunk_create(chunk_id,
a7ceb342 4606 chunk_creation_timestamp, NULL);
d2956687
JG
4607 if (!created_chunk) {
4608 ERR("Failed to create trace chunk");
4609 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4610 goto error;
d2956687 4611 }
92816cc3 4612
d2956687
JG
4613 if (chunk_override_name) {
4614 chunk_status = lttng_trace_chunk_override_name(created_chunk,
4615 chunk_override_name);
4616 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4617 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4618 goto error;
92816cc3
JG
4619 }
4620 }
4621
d2956687
JG
4622 if (chunk_directory_handle) {
4623 chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
4624 credentials);
4625 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4626 ERR("Failed to set trace chunk credentials");
4627 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4628 goto error;
d2956687
JG
4629 }
4630 /*
4631 * The consumer daemon has no ownership of the chunk output
4632 * directory.
4633 */
4634 chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
4635 chunk_directory_handle);
cbf53d23 4636 chunk_directory_handle = NULL;
d2956687
JG
4637 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4638 ERR("Failed to set trace chunk's directory handle");
4639 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4640 goto error;
92816cc3
JG
4641 }
4642 }
4643
d2956687
JG
4644 published_chunk = lttng_trace_chunk_registry_publish_chunk(
4645 consumer_data.chunk_registry, session_id,
4646 created_chunk);
4647 lttng_trace_chunk_put(created_chunk);
4648 created_chunk = NULL;
4649 if (!published_chunk) {
4650 ERR("Failed to publish trace chunk");
4651 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4652 goto error;
d88744a4
JD
4653 }
4654
d2956687
JG
4655 rcu_read_lock();
4656 cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
4657 consumer_data.channels_by_session_id_ht->hash_fct(
4658 &session_id, lttng_ht_seed),
4659 consumer_data.channels_by_session_id_ht->match_fct,
4660 &session_id, &iter.iter, channel,
4661 channels_by_session_id_ht_node.node) {
4662 ret = lttng_consumer_channel_set_trace_chunk(channel,
4663 published_chunk);
4664 if (ret) {
4665 /*
4666 * Roll-back the creation of this chunk.
4667 *
4668 * This is important since the session daemon will
4669 * assume that the creation of this chunk failed and
4670 * will never ask for it to be closed, resulting
4671 * in a leak and an inconsistent state for some
4672 * channels.
4673 */
4674 enum lttcomm_return_code close_ret;
ecd1a12f 4675 char path[LTTNG_PATH_MAX];
d2956687
JG
4676
4677 DBG("Failed to set new trace chunk on existing channels, rolling back");
4678 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4679 session_id, chunk_id,
ecd1a12f
MD
4680 chunk_creation_timestamp, NULL,
4681 path);
d2956687
JG
4682 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4683 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4684 session_id, chunk_id);
4685 }
a1ae2ea5 4686
d2956687
JG
4687 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4688 break;
4689 }
a1ae2ea5
JD
4690 }
4691
e5add6d0
JG
4692 if (relayd_id) {
4693 struct consumer_relayd_sock_pair *relayd;
4694
4695 relayd = consumer_find_relayd(*relayd_id);
4696 if (relayd) {
4697 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4698 ret = relayd_create_trace_chunk(
4699 &relayd->control_sock, published_chunk);
4700 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4701 } else {
4702 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4703 }
4704
4705 if (!relayd || ret) {
4706 enum lttcomm_return_code close_ret;
ecd1a12f 4707 char path[LTTNG_PATH_MAX];
e5add6d0
JG
4708
4709 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4710 session_id,
4711 chunk_id,
bbc4768c 4712 chunk_creation_timestamp,
ecd1a12f 4713 NULL, path);
e5add6d0
JG
4714 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4715 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4716 session_id,
4717 chunk_id);
4718 }
4719
4720 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
7ea24db3 4721 goto error_unlock;
e5add6d0
JG
4722 }
4723 }
7ea24db3 4724error_unlock:
e5add6d0 4725 rcu_read_unlock();
7ea24db3 4726error:
d2956687
JG
4727 /* Release the reference returned by the "publish" operation. */
4728 lttng_trace_chunk_put(published_chunk);
9bb5f1f8 4729 lttng_trace_chunk_put(created_chunk);
d2956687 4730 return ret_code;
a1ae2ea5
JD
4731}
4732
d2956687
JG
4733enum lttcomm_return_code lttng_consumer_close_trace_chunk(
4734 const uint64_t *relayd_id, uint64_t session_id,
bbc4768c 4735 uint64_t chunk_id, time_t chunk_close_timestamp,
ecd1a12f
MD
4736 const enum lttng_trace_chunk_command_type *close_command,
4737 char *path)
a1ae2ea5 4738{
d2956687
JG
4739 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4740 struct lttng_trace_chunk *chunk;
4741 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4742 const char *relayd_id_str = "(none)";
bbc4768c 4743 const char *close_command_name = "none";
d2956687
JG
4744 struct lttng_ht_iter iter;
4745 struct lttng_consumer_channel *channel;
4746 enum lttng_trace_chunk_status chunk_status;
a1ae2ea5 4747
d2956687
JG
4748 if (relayd_id) {
4749 int ret;
4750
4751 /* Only used for logging purposes. */
4752 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4753 "%" PRIu64, *relayd_id);
4754 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4755 relayd_id_str = relayd_id_buffer;
4756 } else {
4757 relayd_id_str = "(formatting error)";
4758 }
bbc4768c
JG
4759 }
4760 if (close_command) {
4761 close_command_name = lttng_trace_chunk_command_type_get_name(
4762 *close_command);
4763 }
d2956687
JG
4764
4765 DBG("Consumer close trace chunk command: relayd_id = %s"
bbc4768c
JG
4766 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
4767 ", close command = %s",
4768 relayd_id_str, session_id, chunk_id,
4769 close_command_name);
4770
d2956687 4771 chunk = lttng_trace_chunk_registry_find_chunk(
bbc4768c
JG
4772 consumer_data.chunk_registry, session_id, chunk_id);
4773 if (!chunk) {
d2956687
JG
4774 ERR("Failed to find chunk: session_id = %" PRIu64
4775 ", chunk_id = %" PRIu64,
4776 session_id, chunk_id);
4777 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
a1ae2ea5
JD
4778 goto end;
4779 }
4780
d2956687
JG
4781 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
4782 chunk_close_timestamp);
4783 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4784 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4785 goto end;
45f1d9a1 4786 }
bbc4768c
JG
4787
4788 if (close_command) {
4789 chunk_status = lttng_trace_chunk_set_close_command(
4790 chunk, *close_command);
4791 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4792 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4793 goto end;
4794 }
4795 }
a1ae2ea5 4796
d2956687
JG
4797 /*
4798 * chunk is now invalid to access as we no longer hold a reference to
4799 * it; it is only kept around to compare it (by address) to the
4800 * current chunk found in the session's channels.
4801 */
4802 rcu_read_lock();
4803 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
4804 channel, node.node) {
4805 int ret;
a1ae2ea5 4806
d2956687
JG
4807 /*
4808 * Only change the channel's chunk to NULL if it still
4809 * references the chunk being closed. The channel may
4810 * reference a newer channel in the case of a session
4811 * rotation. When a session rotation occurs, the "next"
4812 * chunk is created before the "current" chunk is closed.
4813 */
4814 if (channel->trace_chunk != chunk) {
4815 continue;
4816 }
4817 ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
4818 if (ret) {
4819 /*
4820 * Attempt to close the chunk on as many channels as
4821 * possible.
4822 */
4823 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4824 }
a1ae2ea5 4825 }
bbc4768c
JG
4826
4827 if (relayd_id) {
4828 int ret;
4829 struct consumer_relayd_sock_pair *relayd;
4830
4831 relayd = consumer_find_relayd(*relayd_id);
4832 if (relayd) {
4833 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4834 ret = relayd_close_trace_chunk(
ecd1a12f
MD
4835 &relayd->control_sock, chunk,
4836 path);
bbc4768c
JG
4837 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4838 } else {
4839 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
4840 *relayd_id);
4841 }
4842
4843 if (!relayd || ret) {
4844 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4845 goto error_unlock;
4846 }
4847 }
4848error_unlock:
d2956687
JG
4849 rcu_read_unlock();
4850end:
bbc4768c
JG
4851 /*
4852 * Release the reference returned by the "find" operation and
4853 * the session daemon's implicit reference to the chunk.
4854 */
4855 lttng_trace_chunk_put(chunk);
4856 lttng_trace_chunk_put(chunk);
4857
d2956687 4858 return ret_code;
a1ae2ea5 4859}
3654ed19 4860
d2956687
JG
4861enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
4862 const uint64_t *relayd_id, uint64_t session_id,
4863 uint64_t chunk_id)
3654ed19 4864{
c35f9726 4865 int ret;
d2956687 4866 enum lttcomm_return_code ret_code;
d2956687
JG
4867 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4868 const char *relayd_id_str = "(none)";
c35f9726
JG
4869 const bool is_local_trace = !relayd_id;
4870 struct consumer_relayd_sock_pair *relayd = NULL;
6b584c2e 4871 bool chunk_exists_local, chunk_exists_remote;
d2956687
JG
4872
4873 if (relayd_id) {
4874 int ret;
4875
4876 /* Only used for logging purposes. */
4877 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4878 "%" PRIu64, *relayd_id);
4879 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4880 relayd_id_str = relayd_id_buffer;
4881 } else {
4882 relayd_id_str = "(formatting error)";
4883 }
4884 }
4885
4886 DBG("Consumer trace chunk exists command: relayd_id = %s"
d2956687 4887 ", chunk_id = %" PRIu64, relayd_id_str,
c35f9726 4888 chunk_id);
6b584c2e 4889 ret = lttng_trace_chunk_registry_chunk_exists(
d2956687 4890 consumer_data.chunk_registry, session_id,
6b584c2e
JG
4891 chunk_id, &chunk_exists_local);
4892 if (ret) {
4893 /* Internal error. */
4894 ERR("Failed to query the existence of a trace chunk");
4895 ret_code = LTTCOMM_CONSUMERD_FATAL;
13e3b280 4896 goto end;
6b584c2e
JG
4897 }
4898 DBG("Trace chunk %s locally",
4899 chunk_exists_local ? "exists" : "does not exist");
4900 if (chunk_exists_local) {
c35f9726 4901 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
c35f9726
JG
4902 goto end;
4903 } else if (is_local_trace) {
4904 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4905 goto end;
4906 }
4907
4908 rcu_read_lock();
4909 relayd = consumer_find_relayd(*relayd_id);
4910 if (!relayd) {
4911 ERR("Failed to find relayd %" PRIu64, *relayd_id);
4912 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
4913 goto end_rcu_unlock;
4914 }
4915 DBG("Looking up existence of trace chunk on relay daemon");
4916 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4917 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
4918 &chunk_exists_remote);
4919 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4920 if (ret < 0) {
4921 ERR("Failed to look-up the existence of trace chunk on relay daemon");
4922 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
4923 goto end_rcu_unlock;
4924 }
4925
4926 ret_code = chunk_exists_remote ?
4927 LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
d2956687 4928 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
c35f9726
JG
4929 DBG("Trace chunk %s on relay daemon",
4930 chunk_exists_remote ? "exists" : "does not exist");
d2956687 4931
c35f9726
JG
4932end_rcu_unlock:
4933 rcu_read_unlock();
4934end:
d2956687 4935 return ret_code;
3654ed19 4936}
5f3aff8b
MD
4937
4938static
4939int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
4940{
4941 struct lttng_ht *ht;
4942 struct lttng_consumer_stream *stream;
4943 struct lttng_ht_iter iter;
4944 int ret;
4945
4946 ht = consumer_data.stream_per_chan_id_ht;
4947
4948 rcu_read_lock();
4949 cds_lfht_for_each_entry_duplicate(ht->ht,
4950 ht->hash_fct(&channel->key, lttng_ht_seed),
4951 ht->match_fct, &channel->key,
4952 &iter.iter, stream, node_channel_id.node) {
4953 /*
4954 * Protect against teardown with mutex.
4955 */
4956 pthread_mutex_lock(&stream->lock);
4957 if (cds_lfht_is_node_deleted(&stream->node.node)) {
4958 goto next;
4959 }
4960 ret = consumer_clear_stream(stream);
4961 if (ret) {
4962 goto error_unlock;
4963 }
4964 next:
4965 pthread_mutex_unlock(&stream->lock);
4966 }
4967 rcu_read_unlock();
4968 return LTTCOMM_CONSUMERD_SUCCESS;
4969
4970error_unlock:
4971 pthread_mutex_unlock(&stream->lock);
4972 rcu_read_unlock();
4973 return ret;
4974}
4975
4976int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
4977{
4978 int ret;
4979
4980 DBG("Consumer clear channel %" PRIu64, channel->key);
4981
4982 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
4983 /*
4984 * Nothing to do for the metadata channel/stream.
4985 * Snapshot mechanism already take care of the metadata
4986 * handling/generation, and monitored channels only need to
4987 * have their data stream cleared..
4988 */
4989 ret = LTTCOMM_CONSUMERD_SUCCESS;
4990 goto end;
4991 }
4992
4993 if (!channel->monitor) {
4994 ret = consumer_clear_unmonitored_channel(channel);
4995 } else {
4996 ret = consumer_clear_monitored_channel(channel);
4997 }
4998end:
4999 return ret;
5000}
This page took 0.41747 seconds and 4 git commands to generate.