consumerd: register threads to health monitoring
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
331744e3 31#include <signal.h>
3bd1e081 32
990570ed 33#include <common/common.h>
fb3a43a9
DG
34#include <common/utils.h>
35#include <common/compat/poll.h>
309167d2 36#include <common/index/index.h>
10a8a223 37#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 38#include <common/sessiond-comm/relayd.h>
10a8a223
DG
39#include <common/sessiond-comm/sessiond-comm.h>
40#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 41#include <common/relayd/relayd.h>
10a8a223 42#include <common/ust-consumer/ust-consumer.h>
d3e2ba59 43#include <common/consumer-timer.h>
10a8a223
DG
44
45#include "consumer.h"
1d1a276c 46#include "consumer-stream.h"
1fc79fb4 47#include "../bin/lttng-consumerd/health-consumerd.h"
3bd1e081
MD
48
49struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
50 .stream_count = 0,
51 .need_update = 1,
52 .type = LTTNG_CONSUMER_UNKNOWN,
53};
54
d8ef542d
MD
55enum consumer_channel_action {
56 CONSUMER_CHANNEL_ADD,
a0cbdd2e 57 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
58 CONSUMER_CHANNEL_QUIT,
59};
60
61struct consumer_channel_msg {
62 enum consumer_channel_action action;
a0cbdd2e
MD
63 struct lttng_consumer_channel *chan; /* add */
64 uint64_t key; /* del */
d8ef542d
MD
65};
66
3bd1e081
MD
67/*
68 * Flag to inform the polling thread to quit when all fd hung up. Updated by
69 * the consumer_thread_receive_fds when it notices that all fds has hung up.
70 * Also updated by the signal handler (consumer_should_exit()). Read by the
71 * polling threads.
72 */
a98dae5f 73volatile int consumer_quit;
3bd1e081 74
43c34bc3 75/*
43c34bc3
DG
76 * Global hash table containing respectively metadata and data streams. The
77 * stream element in this ht should only be updated by the metadata poll thread
78 * for the metadata and the data poll thread for the data.
79 */
40dc48e0
DG
80static struct lttng_ht *metadata_ht;
81static struct lttng_ht *data_ht;
43c34bc3 82
acdb9057
DG
83/*
84 * Notify a thread lttng pipe to poll back again. This usually means that some
85 * global state has changed so we just send back the thread in a poll wait
86 * call.
87 */
88static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
89{
90 struct lttng_consumer_stream *null_stream = NULL;
91
92 assert(pipe);
93
94 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
95}
96
d8ef542d
MD
97static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
98 struct lttng_consumer_channel *chan,
a0cbdd2e 99 uint64_t key,
d8ef542d
MD
100 enum consumer_channel_action action)
101{
102 struct consumer_channel_msg msg;
103 int ret;
104
e56251fc
DG
105 memset(&msg, 0, sizeof(msg));
106
d8ef542d
MD
107 msg.action = action;
108 msg.chan = chan;
f21dae48 109 msg.key = key;
d8ef542d
MD
110 do {
111 ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
112 } while (ret < 0 && errno == EINTR);
113}
114
a0cbdd2e
MD
115void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
116 uint64_t key)
117{
118 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
119}
120
d8ef542d
MD
121static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
122 struct lttng_consumer_channel **chan,
a0cbdd2e 123 uint64_t *key,
d8ef542d
MD
124 enum consumer_channel_action *action)
125{
126 struct consumer_channel_msg msg;
127 int ret;
128
129 do {
130 ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
131 } while (ret < 0 && errno == EINTR);
132 if (ret > 0) {
133 *action = msg.action;
134 *chan = msg.chan;
a0cbdd2e 135 *key = msg.key;
d8ef542d
MD
136 }
137 return ret;
138}
139
3bd1e081
MD
140/*
141 * Find a stream. The consumer_data.lock must be locked during this
142 * call.
143 */
d88aee68 144static struct lttng_consumer_stream *find_stream(uint64_t key,
8389e4f8 145 struct lttng_ht *ht)
3bd1e081 146{
e4421fec 147 struct lttng_ht_iter iter;
d88aee68 148 struct lttng_ht_node_u64 *node;
e4421fec 149 struct lttng_consumer_stream *stream = NULL;
3bd1e081 150
8389e4f8
DG
151 assert(ht);
152
d88aee68
DG
153 /* -1ULL keys are lookup failures */
154 if (key == (uint64_t) -1ULL) {
7ad0a0cb 155 return NULL;
7a57cf92 156 }
e4421fec 157
6065ceec
DG
158 rcu_read_lock();
159
d88aee68
DG
160 lttng_ht_lookup(ht, &key, &iter);
161 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
162 if (node != NULL) {
163 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 164 }
e4421fec 165
6065ceec
DG
166 rcu_read_unlock();
167
e4421fec 168 return stream;
3bd1e081
MD
169}
170
da009f2c 171static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
172{
173 struct lttng_consumer_stream *stream;
174
04253271 175 rcu_read_lock();
ffe60014 176 stream = find_stream(key, ht);
04253271 177 if (stream) {
da009f2c 178 stream->key = (uint64_t) -1ULL;
04253271
MD
179 /*
180 * We don't want the lookup to match, but we still need
181 * to iterate on this stream when iterating over the hash table. Just
182 * change the node key.
183 */
da009f2c 184 stream->node.key = (uint64_t) -1ULL;
04253271
MD
185 }
186 rcu_read_unlock();
7ad0a0cb
MD
187}
188
d56db448
DG
189/*
190 * Return a channel object for the given key.
191 *
192 * RCU read side lock MUST be acquired before calling this function and
193 * protects the channel ptr.
194 */
d88aee68 195struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 196{
e4421fec 197 struct lttng_ht_iter iter;
d88aee68 198 struct lttng_ht_node_u64 *node;
e4421fec 199 struct lttng_consumer_channel *channel = NULL;
3bd1e081 200
d88aee68
DG
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
7ad0a0cb 203 return NULL;
7a57cf92 204 }
e4421fec 205
d88aee68
DG
206 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
207 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
208 if (node != NULL) {
209 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 210 }
e4421fec
DG
211
212 return channel;
3bd1e081
MD
213}
214
ffe60014 215static void free_stream_rcu(struct rcu_head *head)
7ad0a0cb 216{
d88aee68
DG
217 struct lttng_ht_node_u64 *node =
218 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
219 struct lttng_consumer_stream *stream =
220 caa_container_of(node, struct lttng_consumer_stream, node);
7ad0a0cb 221
ffe60014 222 free(stream);
7ad0a0cb
MD
223}
224
ffe60014 225static void free_channel_rcu(struct rcu_head *head)
702b1ea4 226{
d88aee68
DG
227 struct lttng_ht_node_u64 *node =
228 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
229 struct lttng_consumer_channel *channel =
230 caa_container_of(node, struct lttng_consumer_channel, node);
702b1ea4 231
ffe60014 232 free(channel);
702b1ea4
MD
233}
234
00e2e675
DG
235/*
236 * RCU protected relayd socket pair free.
237 */
ffe60014 238static void free_relayd_rcu(struct rcu_head *head)
00e2e675 239{
d88aee68
DG
240 struct lttng_ht_node_u64 *node =
241 caa_container_of(head, struct lttng_ht_node_u64, head);
00e2e675
DG
242 struct consumer_relayd_sock_pair *relayd =
243 caa_container_of(node, struct consumer_relayd_sock_pair, node);
244
8994307f
DG
245 /*
246 * Close all sockets. This is done in the call RCU since we don't want the
247 * socket fds to be reassigned thus potentially creating bad state of the
248 * relayd object.
249 *
250 * We do not have to lock the control socket mutex here since at this stage
251 * there is no one referencing to this relayd object.
252 */
253 (void) relayd_close(&relayd->control_sock);
254 (void) relayd_close(&relayd->data_sock);
255
00e2e675
DG
256 free(relayd);
257}
258
259/*
260 * Destroy and free relayd socket pair object.
00e2e675 261 */
51230d70 262void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
263{
264 int ret;
265 struct lttng_ht_iter iter;
266
173af62f
DG
267 if (relayd == NULL) {
268 return;
269 }
270
00e2e675
DG
271 DBG("Consumer destroy and close relayd socket pair");
272
273 iter.iter.node = &relayd->node.node;
274 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f 275 if (ret != 0) {
8994307f 276 /* We assume the relayd is being or is destroyed */
173af62f
DG
277 return;
278 }
00e2e675 279
00e2e675 280 /* RCU free() call */
ffe60014
DG
281 call_rcu(&relayd->node.head, free_relayd_rcu);
282}
283
284/*
285 * Remove a channel from the global list protected by a mutex. This function is
286 * also responsible for freeing its data structures.
287 */
288void consumer_del_channel(struct lttng_consumer_channel *channel)
289{
290 int ret;
291 struct lttng_ht_iter iter;
f2a444f1 292 struct lttng_consumer_stream *stream, *stmp;
ffe60014 293
d88aee68 294 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014
DG
295
296 pthread_mutex_lock(&consumer_data.lock);
a9838785 297 pthread_mutex_lock(&channel->lock);
ffe60014 298
51e762e5
JD
299 /* Delete streams that might have been left in the stream list. */
300 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
301 send_node) {
302 cds_list_del(&stream->send_node);
303 /*
304 * Once a stream is added to this list, the buffers were created so
305 * we have a guarantee that this call will succeed.
306 */
307 consumer_stream_destroy(stream, NULL);
308 }
309
d3e2ba59
JD
310 if (channel->live_timer_enabled == 1) {
311 consumer_timer_live_stop(channel);
312 }
313
ffe60014
DG
314 switch (consumer_data.type) {
315 case LTTNG_CONSUMER_KERNEL:
316 break;
317 case LTTNG_CONSUMER32_UST:
318 case LTTNG_CONSUMER64_UST:
319 lttng_ustconsumer_del_channel(channel);
320 break;
321 default:
322 ERR("Unknown consumer_data type");
323 assert(0);
324 goto end;
325 }
326
327 rcu_read_lock();
328 iter.iter.node = &channel->node.node;
329 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
330 assert(!ret);
331 rcu_read_unlock();
332
333 call_rcu(&channel->node.head, free_channel_rcu);
334end:
a9838785 335 pthread_mutex_unlock(&channel->lock);
ffe60014 336 pthread_mutex_unlock(&consumer_data.lock);
00e2e675
DG
337}
338
228b5bf7
DG
339/*
340 * Iterate over the relayd hash table and destroy each element. Finally,
341 * destroy the whole hash table.
342 */
343static void cleanup_relayd_ht(void)
344{
345 struct lttng_ht_iter iter;
346 struct consumer_relayd_sock_pair *relayd;
347
348 rcu_read_lock();
349
350 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
351 node.node) {
51230d70 352 consumer_destroy_relayd(relayd);
228b5bf7
DG
353 }
354
228b5bf7 355 rcu_read_unlock();
36b588ed
MD
356
357 lttng_ht_destroy(consumer_data.relayd_ht);
228b5bf7
DG
358}
359
8994307f
DG
360/*
361 * Update the end point status of all streams having the given network sequence
362 * index (relayd index).
363 *
364 * It's atomically set without having the stream mutex locked which is fine
365 * because we handle the write/read race with a pipe wakeup for each thread.
366 */
da009f2c 367static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
8994307f
DG
368 enum consumer_endpoint_status status)
369{
370 struct lttng_ht_iter iter;
371 struct lttng_consumer_stream *stream;
372
da009f2c 373 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f
DG
374
375 rcu_read_lock();
376
377 /* Let's begin with metadata */
378 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
379 if (stream->net_seq_idx == net_seq_idx) {
380 uatomic_set(&stream->endpoint_status, status);
381 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
382 }
383 }
384
385 /* Follow up by the data streams */
386 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
387 if (stream->net_seq_idx == net_seq_idx) {
388 uatomic_set(&stream->endpoint_status, status);
389 DBG("Delete flag set to data stream %d", stream->wait_fd);
390 }
391 }
392 rcu_read_unlock();
393}
394
395/*
396 * Cleanup a relayd object by flagging every associated streams for deletion,
397 * destroying the object meaning removing it from the relayd hash table,
398 * closing the sockets and freeing the memory in a RCU call.
399 *
400 * If a local data context is available, notify the threads that the streams'
401 * state have changed.
402 */
403static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
404 struct lttng_consumer_local_data *ctx)
405{
da009f2c 406 uint64_t netidx;
8994307f
DG
407
408 assert(relayd);
409
9617607b
DG
410 DBG("Cleaning up relayd sockets");
411
8994307f
DG
412 /* Save the net sequence index before destroying the object */
413 netidx = relayd->net_seq_idx;
414
415 /*
416 * Delete the relayd from the relayd hash table, close the sockets and free
417 * the object in a RCU call.
418 */
51230d70 419 consumer_destroy_relayd(relayd);
8994307f
DG
420
421 /* Set inactive endpoint to all streams */
422 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
423
424 /*
425 * With a local data context, notify the threads that the streams' state
426 * have changed. The write() action on the pipe acts as an "implicit"
427 * memory barrier ordering the updates of the end point status from the
428 * read of this status which happens AFTER receiving this notify.
429 */
430 if (ctx) {
acdb9057 431 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
13886d2d 432 notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
8994307f
DG
433 }
434}
435
a6ba4fe1
DG
436/*
437 * Flag a relayd socket pair for destruction. Destroy it if the refcount
438 * reaches zero.
439 *
440 * RCU read side lock MUST be aquired before calling this function.
441 */
442void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
443{
444 assert(relayd);
445
446 /* Set destroy flag for this object */
447 uatomic_set(&relayd->destroy_flag, 1);
448
449 /* Destroy the relayd if refcount is 0 */
450 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 451 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
452 }
453}
454
3bd1e081 455/*
1d1a276c
DG
456 * Completly destroy stream from every visiable data structure and the given
457 * hash table if one.
458 *
459 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 460 */
e316aad5
DG
461void consumer_del_stream(struct lttng_consumer_stream *stream,
462 struct lttng_ht *ht)
3bd1e081 463{
1d1a276c 464 consumer_stream_destroy(stream, ht);
3bd1e081
MD
465}
466
5ab66908
MD
467/*
468 * XXX naming of del vs destroy is all mixed up.
469 */
470void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
471{
472 consumer_stream_destroy(stream, data_ht);
473}
474
475void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
476{
477 consumer_stream_destroy(stream, metadata_ht);
478}
479
d88aee68
DG
480struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
481 uint64_t stream_key,
3bd1e081 482 enum lttng_consumer_stream_state state,
ffe60014 483 const char *channel_name,
6df2e2c9 484 uid_t uid,
00e2e675 485 gid_t gid,
57a269f2 486 uint64_t relayd_id,
53632229 487 uint64_t session_id,
ffe60014
DG
488 int cpu,
489 int *alloc_ret,
4891ece8
DG
490 enum consumer_channel_type type,
491 unsigned int monitor)
3bd1e081 492{
ffe60014 493 int ret;
3bd1e081 494 struct lttng_consumer_stream *stream;
3bd1e081 495
effcf122 496 stream = zmalloc(sizeof(*stream));
3bd1e081 497 if (stream == NULL) {
7a57cf92 498 PERROR("malloc struct lttng_consumer_stream");
ffe60014 499 ret = -ENOMEM;
7a57cf92 500 goto end;
3bd1e081 501 }
7a57cf92 502
d56db448
DG
503 rcu_read_lock();
504
3bd1e081 505 stream->key = stream_key;
3bd1e081
MD
506 stream->out_fd = -1;
507 stream->out_fd_offset = 0;
e5d1a9b3 508 stream->output_written = 0;
3bd1e081 509 stream->state = state;
6df2e2c9
MD
510 stream->uid = uid;
511 stream->gid = gid;
ffe60014 512 stream->net_seq_idx = relayd_id;
53632229 513 stream->session_id = session_id;
4891ece8 514 stream->monitor = monitor;
774d490c 515 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
309167d2 516 stream->index_fd = -1;
53632229 517 pthread_mutex_init(&stream->lock, NULL);
58b1f425 518
ffe60014
DG
519 /* If channel is the metadata, flag this stream as metadata. */
520 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
521 stream->metadata_flag = 1;
522 /* Metadata is flat out. */
523 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
94d49140
JD
524 /* Live rendez-vous point. */
525 pthread_cond_init(&stream->metadata_rdv, NULL);
526 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
58b1f425 527 } else {
ffe60014
DG
528 /* Format stream name to <channel_name>_<cpu_number> */
529 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
530 channel_name, cpu);
531 if (ret < 0) {
532 PERROR("snprintf stream name");
533 goto error;
534 }
58b1f425 535 }
c30aaa51 536
ffe60014 537 /* Key is always the wait_fd for streams. */
d88aee68 538 lttng_ht_node_init_u64(&stream->node, stream->key);
ffe60014 539
d8ef542d
MD
540 /* Init node per channel id key */
541 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
542
53632229 543 /* Init session id node with the stream session id */
d88aee68 544 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
53632229 545
07b86b52
JD
546 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
547 " relayd_id %" PRIu64 ", session_id %" PRIu64,
548 stream->name, stream->key, channel_key,
549 stream->net_seq_idx, stream->session_id);
d56db448
DG
550
551 rcu_read_unlock();
3bd1e081 552 return stream;
c80048c6
MD
553
554error:
d56db448 555 rcu_read_unlock();
c80048c6 556 free(stream);
7a57cf92 557end:
ffe60014
DG
558 if (alloc_ret) {
559 *alloc_ret = ret;
560 }
c80048c6 561 return NULL;
3bd1e081
MD
562}
563
564/*
565 * Add a stream to the global list protected by a mutex.
566 */
5ab66908 567int consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 568{
5ab66908 569 struct lttng_ht *ht = data_ht;
3bd1e081
MD
570 int ret = 0;
571
e316aad5 572 assert(stream);
43c34bc3 573 assert(ht);
c77fc10a 574
d88aee68 575 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5
DG
576
577 pthread_mutex_lock(&consumer_data.lock);
a9838785 578 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 579 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 580 pthread_mutex_lock(&stream->lock);
b0b335c8 581 rcu_read_lock();
e316aad5 582
43c34bc3 583 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 584 steal_stream_key(stream->key, ht);
43c34bc3 585
d88aee68 586 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 587
d8ef542d
MD
588 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
589 &stream->node_channel_id);
590
ca22feea
DG
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
d88aee68 596 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 597
e316aad5 598 /*
ffe60014
DG
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
e316aad5
DG
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
ffe60014 605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
ffe60014 608 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
609 }
610
611 /* Update consumer data once the node is inserted. */
3bd1e081
MD
612 consumer_data.stream_count++;
613 consumer_data.need_update = 1;
614
e316aad5 615 rcu_read_unlock();
2e818a6a 616 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 617 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 618 pthread_mutex_unlock(&stream->chan->lock);
3bd1e081 619 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 620
3bd1e081
MD
621 return ret;
622}
623
5ab66908
MD
624void consumer_del_data_stream(struct lttng_consumer_stream *stream)
625{
626 consumer_del_stream(stream, data_ht);
627}
628
00e2e675 629/*
3f8e211f
DG
630 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
631 * be acquired before calling this.
00e2e675 632 */
d09e1200 633static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
634{
635 int ret = 0;
d88aee68 636 struct lttng_ht_node_u64 *node;
00e2e675
DG
637 struct lttng_ht_iter iter;
638
ffe60014 639 assert(relayd);
00e2e675 640
00e2e675 641 lttng_ht_lookup(consumer_data.relayd_ht,
d88aee68
DG
642 &relayd->net_seq_idx, &iter);
643 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675 644 if (node != NULL) {
00e2e675
DG
645 goto end;
646 }
d88aee68 647 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
00e2e675 648
00e2e675
DG
649end:
650 return ret;
651}
652
653/*
654 * Allocate and return a consumer relayd socket.
655 */
656struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
da009f2c 657 uint64_t net_seq_idx)
00e2e675
DG
658{
659 struct consumer_relayd_sock_pair *obj = NULL;
660
da009f2c
MD
661 /* net sequence index of -1 is a failure */
662 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
663 goto error;
664 }
665
666 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
667 if (obj == NULL) {
668 PERROR("zmalloc relayd sock");
669 goto error;
670 }
671
672 obj->net_seq_idx = net_seq_idx;
673 obj->refcount = 0;
173af62f 674 obj->destroy_flag = 0;
f96e4545
MD
675 obj->control_sock.sock.fd = -1;
676 obj->data_sock.sock.fd = -1;
d88aee68 677 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
00e2e675
DG
678 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
679
680error:
681 return obj;
682}
683
684/*
685 * Find a relayd socket pair in the global consumer data.
686 *
687 * Return the object if found else NULL.
b0b335c8
MD
688 * RCU read-side lock must be held across this call and while using the
689 * returned object.
00e2e675 690 */
d88aee68 691struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
692{
693 struct lttng_ht_iter iter;
d88aee68 694 struct lttng_ht_node_u64 *node;
00e2e675
DG
695 struct consumer_relayd_sock_pair *relayd = NULL;
696
697 /* Negative keys are lookup failures */
d88aee68 698 if (key == (uint64_t) -1ULL) {
00e2e675
DG
699 goto error;
700 }
701
d88aee68 702 lttng_ht_lookup(consumer_data.relayd_ht, &key,
00e2e675 703 &iter);
d88aee68 704 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675
DG
705 if (node != NULL) {
706 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
707 }
708
00e2e675
DG
709error:
710 return relayd;
711}
712
10a50311
JD
713/*
714 * Find a relayd and send the stream
715 *
716 * Returns 0 on success, < 0 on error
717 */
718int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
719 char *path)
720{
721 int ret = 0;
722 struct consumer_relayd_sock_pair *relayd;
723
724 assert(stream);
725 assert(stream->net_seq_idx != -1ULL);
726 assert(path);
727
728 /* The stream is not metadata. Get relayd reference if exists. */
729 rcu_read_lock();
730 relayd = consumer_find_relayd(stream->net_seq_idx);
731 if (relayd != NULL) {
732 /* Add stream on the relayd */
733 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
734 ret = relayd_add_stream(&relayd->control_sock, stream->name,
735 path, &stream->relayd_stream_id,
736 stream->chan->tracefile_size, stream->chan->tracefile_count);
737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
738 if (ret < 0) {
739 goto end;
740 }
1c20f0e2 741
10a50311 742 uatomic_inc(&relayd->refcount);
d01178b6 743 stream->sent_to_relayd = 1;
10a50311
JD
744 } else {
745 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
746 stream->key, stream->net_seq_idx);
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
752 stream->name, stream->key, stream->net_seq_idx);
753
754end:
755 rcu_read_unlock();
756 return ret;
757}
758
759/*
760 * Find a relayd and close the stream
761 */
762void close_relayd_stream(struct lttng_consumer_stream *stream)
763{
764 struct consumer_relayd_sock_pair *relayd;
765
766 /* The stream is not metadata. Get relayd reference if exists. */
767 rcu_read_lock();
768 relayd = consumer_find_relayd(stream->net_seq_idx);
769 if (relayd) {
770 consumer_stream_relayd_close(stream, relayd);
771 }
772 rcu_read_unlock();
773}
774
00e2e675
DG
775/*
776 * Handle stream for relayd transmission if the stream applies for network
777 * streaming where the net sequence index is set.
778 *
779 * Return destination file descriptor or negative value on error.
780 */
6197aea7 781static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
1d4dfdef
DG
782 size_t data_size, unsigned long padding,
783 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
784{
785 int outfd = -1, ret;
00e2e675
DG
786 struct lttcomm_relayd_data_hdr data_hdr;
787
788 /* Safety net */
789 assert(stream);
6197aea7 790 assert(relayd);
00e2e675
DG
791
792 /* Reset data header */
793 memset(&data_hdr, 0, sizeof(data_hdr));
794
00e2e675
DG
795 if (stream->metadata_flag) {
796 /* Caller MUST acquire the relayd control socket lock */
797 ret = relayd_send_metadata(&relayd->control_sock, data_size);
798 if (ret < 0) {
799 goto error;
800 }
801
802 /* Metadata are always sent on the control socket. */
6151a90f 803 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
804 } else {
805 /* Set header with stream information */
806 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
807 data_hdr.data_size = htobe32(data_size);
1d4dfdef 808 data_hdr.padding_size = htobe32(padding);
39df6d9f
DG
809 /*
810 * Note that net_seq_num below is assigned with the *current* value of
811 * next_net_seq_num and only after that the next_net_seq_num will be
812 * increment. This is why when issuing a command on the relayd using
813 * this next value, 1 should always be substracted in order to compare
814 * the last seen sequence number on the relayd side to the last sent.
815 */
3604f373 816 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
817 /* Other fields are zeroed previously */
818
819 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
820 sizeof(data_hdr));
821 if (ret < 0) {
822 goto error;
823 }
824
3604f373
DG
825 ++stream->next_net_seq_num;
826
00e2e675 827 /* Set to go on data socket */
6151a90f 828 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
829 }
830
831error:
832 return outfd;
833}
834
3bd1e081 835/*
ffe60014
DG
836 * Allocate and return a new lttng_consumer_channel object using the given key
837 * to initialize the hash table node.
838 *
839 * On error, return NULL.
3bd1e081 840 */
886224ff 841struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
ffe60014
DG
842 uint64_t session_id,
843 const char *pathname,
844 const char *name,
845 uid_t uid,
846 gid_t gid,
57a269f2 847 uint64_t relayd_id,
1624d5b7
JD
848 enum lttng_event_output output,
849 uint64_t tracefile_size,
2bba9e53 850 uint64_t tracefile_count,
1950109e 851 uint64_t session_id_per_pid,
ecc48a90
JD
852 unsigned int monitor,
853 unsigned int live_timer_interval)
3bd1e081
MD
854{
855 struct lttng_consumer_channel *channel;
3bd1e081 856
276b26d1 857 channel = zmalloc(sizeof(*channel));
3bd1e081 858 if (channel == NULL) {
7a57cf92 859 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
860 goto end;
861 }
ffe60014
DG
862
863 channel->key = key;
3bd1e081 864 channel->refcount = 0;
ffe60014 865 channel->session_id = session_id;
1950109e 866 channel->session_id_per_pid = session_id_per_pid;
ffe60014
DG
867 channel->uid = uid;
868 channel->gid = gid;
869 channel->relayd_id = relayd_id;
870 channel->output = output;
1624d5b7
JD
871 channel->tracefile_size = tracefile_size;
872 channel->tracefile_count = tracefile_count;
2bba9e53 873 channel->monitor = monitor;
ecc48a90 874 channel->live_timer_interval = live_timer_interval;
a9838785 875 pthread_mutex_init(&channel->lock, NULL);
ec6ea7d0 876 pthread_mutex_init(&channel->timer_lock, NULL);
ffe60014 877
07b86b52
JD
878 /*
879 * In monitor mode, the streams associated with the channel will be put in
880 * a special list ONLY owned by this channel. So, the refcount is set to 1
881 * here meaning that the channel itself has streams that are referenced.
882 *
883 * On a channel deletion, once the channel is no longer visible, the
884 * refcount is decremented and checked for a zero value to delete it. With
885 * streams in no monitor mode, it will now be safe to destroy the channel.
886 */
887 if (!channel->monitor) {
888 channel->refcount = 1;
889 }
890
ffe60014
DG
891 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
892 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
893
894 strncpy(channel->name, name, sizeof(channel->name));
895 channel->name[sizeof(channel->name) - 1] = '\0';
896
d88aee68 897 lttng_ht_node_init_u64(&channel->node, channel->key);
d8ef542d
MD
898
899 channel->wait_fd = -1;
900
ffe60014
DG
901 CDS_INIT_LIST_HEAD(&channel->streams.head);
902
d88aee68 903 DBG("Allocated channel (key %" PRIu64 ")", channel->key)
3bd1e081 904
3bd1e081
MD
905end:
906 return channel;
907}
908
909/*
910 * Add a channel to the global list protected by a mutex.
821fffb2
DG
911 *
912 * On success 0 is returned else a negative value.
3bd1e081 913 */
d8ef542d
MD
914int consumer_add_channel(struct lttng_consumer_channel *channel,
915 struct lttng_consumer_local_data *ctx)
3bd1e081 916{
ffe60014 917 int ret = 0;
d88aee68 918 struct lttng_ht_node_u64 *node;
c77fc10a
DG
919 struct lttng_ht_iter iter;
920
3bd1e081 921 pthread_mutex_lock(&consumer_data.lock);
a9838785 922 pthread_mutex_lock(&channel->lock);
ec6ea7d0 923 pthread_mutex_lock(&channel->timer_lock);
6065ceec 924 rcu_read_lock();
c77fc10a 925
7972aab2 926 lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
d88aee68 927 node = lttng_ht_iter_get_node_u64(&iter);
c77fc10a
DG
928 if (node != NULL) {
929 /* Channel already exist. Ignore the insertion */
d88aee68
DG
930 ERR("Consumer add channel key %" PRIu64 " already exists!",
931 channel->key);
821fffb2 932 ret = -EEXIST;
c77fc10a
DG
933 goto end;
934 }
935
d88aee68 936 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
937
938end:
6065ceec 939 rcu_read_unlock();
ec6ea7d0 940 pthread_mutex_unlock(&channel->timer_lock);
a9838785 941 pthread_mutex_unlock(&channel->lock);
3bd1e081 942 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 943
d8ef542d 944 if (!ret && channel->wait_fd != -1 &&
10a50311 945 channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 946 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 947 }
ffe60014 948 return ret;
3bd1e081
MD
949}
950
951/*
952 * Allocate the pollfd structure and the local view of the out fds to avoid
953 * doing a lookup in the linked list and concurrency issues when writing is
954 * needed. Called with consumer_data.lock held.
955 *
956 * Returns the number of fds in the structures.
957 */
ffe60014
DG
958static int update_poll_array(struct lttng_consumer_local_data *ctx,
959 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
960 struct lttng_ht *ht)
3bd1e081 961{
3bd1e081 962 int i = 0;
e4421fec
DG
963 struct lttng_ht_iter iter;
964 struct lttng_consumer_stream *stream;
3bd1e081 965
ffe60014
DG
966 assert(ctx);
967 assert(ht);
968 assert(pollfd);
969 assert(local_stream);
970
3bd1e081 971 DBG("Updating poll fd array");
481d6c57 972 rcu_read_lock();
43c34bc3 973 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
974 /*
975 * Only active streams with an active end point can be added to the
976 * poll set and local stream storage of the thread.
977 *
978 * There is a potential race here for endpoint_status to be updated
979 * just after the check. However, this is OK since the stream(s) will
980 * be deleted once the thread is notified that the end point state has
981 * changed where this function will be called back again.
982 */
983 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
79d4ffb7 984 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
3bd1e081
MD
985 continue;
986 }
7972aab2
DG
987 /*
988 * This clobbers way too much the debug output. Uncomment that if you
989 * need it for debugging purposes.
990 *
991 * DBG("Active FD %d", stream->wait_fd);
992 */
e4421fec 993 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 994 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 995 local_stream[i] = stream;
3bd1e081
MD
996 i++;
997 }
481d6c57 998 rcu_read_unlock();
3bd1e081
MD
999
1000 /*
50f8ae69 1001 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1002 * increment i so nb_fd is the number of real FD.
1003 */
acdb9057 1004 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1005 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
1006 return i;
1007}
1008
1009/*
1010 * Poll on the should_quit pipe and the command socket return -1 on error and
1011 * should exit, 0 if data is available on the command socket
1012 */
1013int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1014{
1015 int num_rdy;
1016
88f2b785 1017restart:
3bd1e081
MD
1018 num_rdy = poll(consumer_sockpoll, 2, -1);
1019 if (num_rdy == -1) {
88f2b785
MD
1020 /*
1021 * Restart interrupted system call.
1022 */
1023 if (errno == EINTR) {
1024 goto restart;
1025 }
7a57cf92 1026 PERROR("Poll error");
3bd1e081
MD
1027 goto exit;
1028 }
509bb1cf 1029 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
1030 DBG("consumer_should_quit wake up");
1031 goto exit;
1032 }
1033 return 0;
1034
1035exit:
1036 return -1;
1037}
1038
1039/*
1040 * Set the error socket.
1041 */
ffe60014
DG
1042void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1043 int sock)
3bd1e081
MD
1044{
1045 ctx->consumer_error_socket = sock;
1046}
1047
1048/*
1049 * Set the command socket path.
1050 */
3bd1e081
MD
1051void lttng_consumer_set_command_sock_path(
1052 struct lttng_consumer_local_data *ctx, char *sock)
1053{
1054 ctx->consumer_command_sock_path = sock;
1055}
1056
1057/*
1058 * Send return code to the session daemon.
1059 * If the socket is not defined, we return 0, it is not a fatal error
1060 */
ffe60014 1061int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1062{
1063 if (ctx->consumer_error_socket > 0) {
1064 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1065 sizeof(enum lttcomm_sessiond_command));
1066 }
1067
1068 return 0;
1069}
1070
1071/*
228b5bf7
DG
1072 * Close all the tracefiles and stream fds and MUST be called when all
1073 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081
MD
1074 */
1075void lttng_consumer_cleanup(void)
1076{
e4421fec 1077 struct lttng_ht_iter iter;
ffe60014 1078 struct lttng_consumer_channel *channel;
6065ceec
DG
1079
1080 rcu_read_lock();
3bd1e081 1081
ffe60014
DG
1082 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1083 node.node) {
702b1ea4 1084 consumer_del_channel(channel);
3bd1e081 1085 }
6065ceec
DG
1086
1087 rcu_read_unlock();
d6ce1df2 1088
d6ce1df2 1089 lttng_ht_destroy(consumer_data.channel_ht);
228b5bf7
DG
1090
1091 cleanup_relayd_ht();
1092
d8ef542d
MD
1093 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1094
228b5bf7
DG
1095 /*
1096 * This HT contains streams that are freed by either the metadata thread or
1097 * the data thread so we do *nothing* on the hash table and simply destroy
1098 * it.
1099 */
1100 lttng_ht_destroy(consumer_data.stream_list_ht);
3bd1e081
MD
1101}
1102
1103/*
1104 * Called from signal handler.
1105 */
1106void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1107{
1108 int ret;
1109 consumer_quit = 1;
6f94560a
MD
1110 do {
1111 ret = write(ctx->consumer_should_quit[1], "4", 1);
1112 } while (ret < 0 && errno == EINTR);
4cec016f 1113 if (ret < 0 || ret != 1) {
7a57cf92 1114 PERROR("write consumer quit");
3bd1e081 1115 }
ab1027f4
DG
1116
1117 DBG("Consumer flag that it should quit");
3bd1e081
MD
1118}
1119
00e2e675
DG
1120void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1121 off_t orig_offset)
3bd1e081
MD
1122{
1123 int outfd = stream->out_fd;
1124
1125 /*
1126 * This does a blocking write-and-wait on any page that belongs to the
1127 * subbuffer prior to the one we just wrote.
1128 * Don't care about error values, as these are just hints and ways to
1129 * limit the amount of page cache used.
1130 */
ffe60014 1131 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1132 return;
1133 }
ffe60014
DG
1134 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1135 stream->max_sb_size,
3bd1e081
MD
1136 SYNC_FILE_RANGE_WAIT_BEFORE
1137 | SYNC_FILE_RANGE_WRITE
1138 | SYNC_FILE_RANGE_WAIT_AFTER);
1139 /*
1140 * Give hints to the kernel about how we access the file:
1141 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1142 * we write it.
1143 *
1144 * We need to call fadvise again after the file grows because the
1145 * kernel does not seem to apply fadvise to non-existing parts of the
1146 * file.
1147 *
1148 * Call fadvise _after_ having waited for the page writeback to
1149 * complete because the dirty page writeback semantic is not well
1150 * defined. So it can be expected to lead to lower throughput in
1151 * streaming.
1152 */
ffe60014
DG
1153 posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1154 stream->max_sb_size, POSIX_FADV_DONTNEED);
3bd1e081
MD
1155}
1156
1157/*
1158 * Initialise the necessary environnement :
1159 * - create a new context
1160 * - create the poll_pipe
1161 * - create the should_quit pipe (for signal handler)
1162 * - create the thread pipe (for splice)
1163 *
1164 * Takes a function pointer as argument, this function is called when data is
1165 * available on a buffer. This function is responsible to do the
1166 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1167 * buffer configuration and then kernctl_put_next_subbuf at the end.
1168 *
1169 * Returns a pointer to the new context or NULL on error.
1170 */
1171struct lttng_consumer_local_data *lttng_consumer_create(
1172 enum lttng_consumer_type type,
4078b776 1173 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 1174 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
1175 int (*recv_channel)(struct lttng_consumer_channel *channel),
1176 int (*recv_stream)(struct lttng_consumer_stream *stream),
30319bcb 1177 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1178{
d8ef542d 1179 int ret;
3bd1e081
MD
1180 struct lttng_consumer_local_data *ctx;
1181
1182 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1183 consumer_data.type == type);
1184 consumer_data.type = type;
1185
effcf122 1186 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081 1187 if (ctx == NULL) {
7a57cf92 1188 PERROR("allocating context");
3bd1e081
MD
1189 goto error;
1190 }
1191
1192 ctx->consumer_error_socket = -1;
331744e3 1193 ctx->consumer_metadata_socket = -1;
75d83e50 1194 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
3bd1e081
MD
1195 /* assign the callbacks */
1196 ctx->on_buffer_ready = buffer_ready;
1197 ctx->on_recv_channel = recv_channel;
1198 ctx->on_recv_stream = recv_stream;
1199 ctx->on_update_stream = update_stream;
1200
acdb9057
DG
1201 ctx->consumer_data_pipe = lttng_pipe_open(0);
1202 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1203 goto error_poll_pipe;
1204 }
1205
1206 ret = pipe(ctx->consumer_should_quit);
1207 if (ret < 0) {
7a57cf92 1208 PERROR("Error creating recv pipe");
3bd1e081
MD
1209 goto error_quit_pipe;
1210 }
1211
1212 ret = pipe(ctx->consumer_thread_pipe);
1213 if (ret < 0) {
7a57cf92 1214 PERROR("Error creating thread pipe");
3bd1e081
MD
1215 goto error_thread_pipe;
1216 }
1217
d8ef542d
MD
1218 ret = pipe(ctx->consumer_channel_pipe);
1219 if (ret < 0) {
1220 PERROR("Error creating channel pipe");
1221 goto error_channel_pipe;
1222 }
1223
13886d2d
DG
1224 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1225 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1226 goto error_metadata_pipe;
1227 }
3bd1e081 1228
fb3a43a9
DG
1229 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1230 if (ret < 0) {
1231 goto error_splice_pipe;
1232 }
1233
1234 return ctx;
3bd1e081 1235
fb3a43a9 1236error_splice_pipe:
13886d2d 1237 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
fb3a43a9 1238error_metadata_pipe:
d8ef542d
MD
1239 utils_close_pipe(ctx->consumer_channel_pipe);
1240error_channel_pipe:
fb3a43a9 1241 utils_close_pipe(ctx->consumer_thread_pipe);
3bd1e081 1242error_thread_pipe:
d8ef542d 1243 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1244error_quit_pipe:
acdb9057 1245 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1246error_poll_pipe:
1247 free(ctx);
1248error:
1249 return NULL;
1250}
1251
1252/*
1253 * Close all fds associated with the instance and free the context.
1254 */
1255void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1256{
4c462e79
MD
1257 int ret;
1258
ab1027f4
DG
1259 DBG("Consumer destroying it. Closing everything.");
1260
4c462e79
MD
1261 ret = close(ctx->consumer_error_socket);
1262 if (ret) {
1263 PERROR("close");
1264 }
331744e3
JD
1265 ret = close(ctx->consumer_metadata_socket);
1266 if (ret) {
1267 PERROR("close");
1268 }
d8ef542d
MD
1269 utils_close_pipe(ctx->consumer_thread_pipe);
1270 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1271 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1272 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
d8ef542d 1273 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9
DG
1274 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1275
3bd1e081
MD
1276 unlink(ctx->consumer_command_sock_path);
1277 free(ctx);
1278}
1279
6197aea7
DG
1280/*
1281 * Write the metadata stream id on the specified file descriptor.
1282 */
1283static int write_relayd_metadata_id(int fd,
1284 struct lttng_consumer_stream *stream,
ffe60014 1285 struct consumer_relayd_sock_pair *relayd, unsigned long padding)
6197aea7
DG
1286{
1287 int ret;
1d4dfdef 1288 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1289
1d4dfdef
DG
1290 hdr.stream_id = htobe64(stream->relayd_stream_id);
1291 hdr.padding_size = htobe32(padding);
6197aea7 1292 do {
1d4dfdef 1293 ret = write(fd, (void *) &hdr, sizeof(hdr));
6197aea7 1294 } while (ret < 0 && errno == EINTR);
4cec016f 1295 if (ret < 0 || ret != sizeof(hdr)) {
d7b75ec8
DG
1296 /*
1297 * This error means that the fd's end is closed so ignore the perror
1298 * not to clubber the error output since this can happen in a normal
1299 * code path.
1300 */
1301 if (errno != EPIPE) {
1302 PERROR("write metadata stream id");
1303 }
1304 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1305 /*
1306 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1307 * handle writting the missing part so report that as an error and
1308 * don't lie to the caller.
1309 */
1310 ret = -1;
6197aea7
DG
1311 goto end;
1312 }
1d4dfdef
DG
1313 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1314 stream->relayd_stream_id, padding);
6197aea7
DG
1315
1316end:
1317 return ret;
1318}
1319
3bd1e081 1320/*
09e26845
DG
1321 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1322 * core function for writing trace buffers to either the local filesystem or
1323 * the network.
1324 *
79d4ffb7
DG
1325 * It must be called with the stream lock held.
1326 *
09e26845 1327 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1328 *
1329 * Returns the number of bytes written
1330 */
4078b776 1331ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081 1332 struct lttng_consumer_local_data *ctx,
1d4dfdef 1333 struct lttng_consumer_stream *stream, unsigned long len,
309167d2
JD
1334 unsigned long padding,
1335 struct lttng_packet_index *index)
3bd1e081 1336{
f02e1e8a 1337 unsigned long mmap_offset;
ffe60014 1338 void *mmap_base;
f02e1e8a
DG
1339 ssize_t ret = 0, written = 0;
1340 off_t orig_offset = stream->out_fd_offset;
1341 /* Default is on the disk */
1342 int outfd = stream->out_fd;
f02e1e8a 1343 struct consumer_relayd_sock_pair *relayd = NULL;
8994307f 1344 unsigned int relayd_hang_up = 0;
f02e1e8a
DG
1345
1346 /* RCU lock for the relayd pointer */
1347 rcu_read_lock();
1348
1349 /* Flag that the current stream if set for network streaming. */
da009f2c 1350 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1351 relayd = consumer_find_relayd(stream->net_seq_idx);
1352 if (relayd == NULL) {
56591bac 1353 ret = -EPIPE;
f02e1e8a
DG
1354 goto end;
1355 }
1356 }
1357
1358 /* get the offset inside the fd to mmap */
3bd1e081
MD
1359 switch (consumer_data.type) {
1360 case LTTNG_CONSUMER_KERNEL:
ffe60014 1361 mmap_base = stream->mmap_base;
f02e1e8a 1362 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
56591bac
MD
1363 if (ret != 0) {
1364 PERROR("tracer ctl get_mmap_read_offset");
1365 written = -errno;
1366 goto end;
1367 }
f02e1e8a 1368 break;
7753dea8
MD
1369 case LTTNG_CONSUMER32_UST:
1370 case LTTNG_CONSUMER64_UST:
ffe60014
DG
1371 mmap_base = lttng_ustctl_get_mmap_base(stream);
1372 if (!mmap_base) {
1373 ERR("read mmap get mmap base for stream %s", stream->name);
56591bac 1374 written = -EPERM;
ffe60014
DG
1375 goto end;
1376 }
1377 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
56591bac
MD
1378 if (ret != 0) {
1379 PERROR("tracer ctl get_mmap_read_offset");
1380 written = ret;
1381 goto end;
1382 }
f02e1e8a 1383 break;
3bd1e081
MD
1384 default:
1385 ERR("Unknown consumer_data type");
1386 assert(0);
1387 }
b9182dd9 1388
f02e1e8a
DG
1389 /* Handle stream on the relayd if the output is on the network */
1390 if (relayd) {
1391 unsigned long netlen = len;
1392
1393 /*
1394 * Lock the control socket for the complete duration of the function
1395 * since from this point on we will use the socket.
1396 */
1397 if (stream->metadata_flag) {
1398 /* Metadata requires the control socket. */
1399 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1d4dfdef 1400 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1401 }
1402
1d4dfdef 1403 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
f02e1e8a
DG
1404 if (ret >= 0) {
1405 /* Use the returned socket. */
1406 outfd = ret;
1407
1408 /* Write metadata stream id before payload */
1409 if (stream->metadata_flag) {
1d4dfdef 1410 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
f02e1e8a 1411 if (ret < 0) {
f02e1e8a 1412 written = ret;
8994307f
DG
1413 /* Socket operation failed. We consider the relayd dead */
1414 if (ret == -EPIPE || ret == -EINVAL) {
1415 relayd_hang_up = 1;
1416 goto write_error;
1417 }
f02e1e8a
DG
1418 goto end;
1419 }
f02e1e8a 1420 }
8994307f
DG
1421 } else {
1422 /* Socket operation failed. We consider the relayd dead */
1423 if (ret == -EPIPE || ret == -EINVAL) {
1424 relayd_hang_up = 1;
1425 goto write_error;
1426 }
1427 /* Else, use the default set before which is the filesystem. */
f02e1e8a 1428 }
1d4dfdef
DG
1429 } else {
1430 /* No streaming, we have to set the len with the full padding */
1431 len += padding;
1624d5b7
JD
1432
1433 /*
1434 * Check if we need to change the tracefile before writing the packet.
1435 */
1436 if (stream->chan->tracefile_size > 0 &&
1437 (stream->tracefile_size_current + len) >
1438 stream->chan->tracefile_size) {
fe4477ee
JD
1439 ret = utils_rotate_stream_file(stream->chan->pathname,
1440 stream->name, stream->chan->tracefile_size,
1441 stream->chan->tracefile_count, stream->uid, stream->gid,
309167d2
JD
1442 stream->out_fd, &(stream->tracefile_count_current),
1443 &stream->out_fd);
1624d5b7
JD
1444 if (ret < 0) {
1445 ERR("Rotating output file");
1446 goto end;
1447 }
309167d2
JD
1448 outfd = stream->out_fd;
1449
1450 if (stream->index_fd >= 0) {
1451 ret = index_create_file(stream->chan->pathname,
1452 stream->name, stream->uid, stream->gid,
1453 stream->chan->tracefile_size,
1454 stream->tracefile_count_current);
1455 if (ret < 0) {
1456 goto end;
1457 }
1458 stream->index_fd = ret;
1459 }
1460
a6976990
DG
1461 /* Reset current size because we just perform a rotation. */
1462 stream->tracefile_size_current = 0;
a1ae300f
JD
1463 stream->out_fd_offset = 0;
1464 orig_offset = 0;
1624d5b7
JD
1465 }
1466 stream->tracefile_size_current += len;
309167d2
JD
1467 if (index) {
1468 index->offset = htobe64(stream->out_fd_offset);
1469 }
f02e1e8a
DG
1470 }
1471
1472 while (len > 0) {
1473 do {
ffe60014 1474 ret = write(outfd, mmap_base + mmap_offset, len);
f02e1e8a 1475 } while (ret < 0 && errno == EINTR);
1d4dfdef 1476 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
f02e1e8a 1477 if (ret < 0) {
c5c45efa
DG
1478 /*
1479 * This is possible if the fd is closed on the other side (outfd)
1480 * or any write problem. It can be verbose a bit for a normal
1481 * execution if for instance the relayd is stopped abruptly. This
1482 * can happen so set this to a DBG statement.
1483 */
1484 DBG("Error in file write mmap");
f02e1e8a 1485 if (written == 0) {
56591bac 1486 written = -errno;
f02e1e8a 1487 }
8994307f
DG
1488 /* Socket operation failed. We consider the relayd dead */
1489 if (errno == EPIPE || errno == EINVAL) {
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
f02e1e8a
DG
1493 goto end;
1494 } else if (ret > len) {
77c7c900 1495 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
f02e1e8a
DG
1496 written += ret;
1497 goto end;
1498 } else {
1499 len -= ret;
1500 mmap_offset += ret;
1501 }
f02e1e8a
DG
1502
1503 /* This call is useless on a socket so better save a syscall. */
1504 if (!relayd) {
1505 /* This won't block, but will start writeout asynchronously */
1506 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1507 SYNC_FILE_RANGE_WRITE);
1508 stream->out_fd_offset += ret;
1509 }
e5d1a9b3 1510 stream->output_written += ret;
f02e1e8a
DG
1511 written += ret;
1512 }
1513 lttng_consumer_sync_trace_file(stream, orig_offset);
1514
8994307f
DG
1515write_error:
1516 /*
1517 * This is a special case that the relayd has closed its socket. Let's
1518 * cleanup the relayd object and all associated streams.
1519 */
1520 if (relayd && relayd_hang_up) {
1521 cleanup_relayd(relayd, ctx);
1522 }
1523
f02e1e8a
DG
1524end:
1525 /* Unlock only if ctrl socket used */
1526 if (relayd && stream->metadata_flag) {
1527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1528 }
1529
1530 rcu_read_unlock();
1531 return written;
3bd1e081
MD
1532}
1533
1534/*
1535 * Splice the data from the ring buffer to the tracefile.
1536 *
79d4ffb7
DG
1537 * It must be called with the stream lock held.
1538 *
3bd1e081
MD
1539 * Returns the number of bytes spliced.
1540 */
4078b776 1541ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081 1542 struct lttng_consumer_local_data *ctx,
1d4dfdef 1543 struct lttng_consumer_stream *stream, unsigned long len,
309167d2
JD
1544 unsigned long padding,
1545 struct lttng_packet_index *index)
3bd1e081 1546{
f02e1e8a
DG
1547 ssize_t ret = 0, written = 0, ret_splice = 0;
1548 loff_t offset = 0;
1549 off_t orig_offset = stream->out_fd_offset;
1550 int fd = stream->wait_fd;
1551 /* Default is on the disk */
1552 int outfd = stream->out_fd;
f02e1e8a 1553 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1554 int *splice_pipe;
8994307f 1555 unsigned int relayd_hang_up = 0;
f02e1e8a 1556
3bd1e081
MD
1557 switch (consumer_data.type) {
1558 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1559 break;
7753dea8
MD
1560 case LTTNG_CONSUMER32_UST:
1561 case LTTNG_CONSUMER64_UST:
f02e1e8a 1562 /* Not supported for user space tracing */
3bd1e081
MD
1563 return -ENOSYS;
1564 default:
1565 ERR("Unknown consumer_data type");
1566 assert(0);
3bd1e081
MD
1567 }
1568
f02e1e8a
DG
1569 /* RCU lock for the relayd pointer */
1570 rcu_read_lock();
1571
1572 /* Flag that the current stream if set for network streaming. */
da009f2c 1573 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1574 relayd = consumer_find_relayd(stream->net_seq_idx);
1575 if (relayd == NULL) {
56591bac 1576 ret = -EPIPE;
f02e1e8a
DG
1577 goto end;
1578 }
1579 }
1580
fb3a43a9
DG
1581 /*
1582 * Choose right pipe for splice. Metadata and trace data are handled by
1583 * different threads hence the use of two pipes in order not to race or
1584 * corrupt the written data.
1585 */
1586 if (stream->metadata_flag) {
1587 splice_pipe = ctx->consumer_splice_metadata_pipe;
1588 } else {
1589 splice_pipe = ctx->consumer_thread_pipe;
1590 }
1591
f02e1e8a 1592 /* Write metadata stream id before payload */
1d4dfdef
DG
1593 if (relayd) {
1594 int total_len = len;
f02e1e8a 1595
1d4dfdef
DG
1596 if (stream->metadata_flag) {
1597 /*
1598 * Lock the control socket for the complete duration of the function
1599 * since from this point on we will use the socket.
1600 */
1601 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1602
1603 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1604 padding);
1605 if (ret < 0) {
1606 written = ret;
8994307f
DG
1607 /* Socket operation failed. We consider the relayd dead */
1608 if (ret == -EBADF) {
1609 WARN("Remote relayd disconnected. Stopping");
1610 relayd_hang_up = 1;
1611 goto write_error;
1612 }
1d4dfdef
DG
1613 goto end;
1614 }
1615
1616 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1617 }
1618
1619 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1620 if (ret >= 0) {
1621 /* Use the returned socket. */
1622 outfd = ret;
1623 } else {
8994307f
DG
1624 /* Socket operation failed. We consider the relayd dead */
1625 if (ret == -EBADF) {
1626 WARN("Remote relayd disconnected. Stopping");
1627 relayd_hang_up = 1;
1628 goto write_error;
1629 }
f02e1e8a
DG
1630 goto end;
1631 }
1d4dfdef
DG
1632 } else {
1633 /* No streaming, we have to set the len with the full padding */
1634 len += padding;
1624d5b7
JD
1635
1636 /*
1637 * Check if we need to change the tracefile before writing the packet.
1638 */
1639 if (stream->chan->tracefile_size > 0 &&
1640 (stream->tracefile_size_current + len) >
1641 stream->chan->tracefile_size) {
fe4477ee
JD
1642 ret = utils_rotate_stream_file(stream->chan->pathname,
1643 stream->name, stream->chan->tracefile_size,
1644 stream->chan->tracefile_count, stream->uid, stream->gid,
309167d2
JD
1645 stream->out_fd, &(stream->tracefile_count_current),
1646 &stream->out_fd);
1624d5b7
JD
1647 if (ret < 0) {
1648 ERR("Rotating output file");
1649 goto end;
1650 }
309167d2
JD
1651 outfd = stream->out_fd;
1652
1653 if (stream->index_fd >= 0) {
1654 ret = index_create_file(stream->chan->pathname,
1655 stream->name, stream->uid, stream->gid,
1656 stream->chan->tracefile_size,
1657 stream->tracefile_count_current);
1658 if (ret < 0) {
1659 goto end;
1660 }
1661 stream->index_fd = ret;
1662 }
1663
a6976990
DG
1664 /* Reset current size because we just perform a rotation. */
1665 stream->tracefile_size_current = 0;
a1ae300f
JD
1666 stream->out_fd_offset = 0;
1667 orig_offset = 0;
1624d5b7
JD
1668 }
1669 stream->tracefile_size_current += len;
309167d2 1670 index->offset = htobe64(stream->out_fd_offset);
f02e1e8a
DG
1671 }
1672
1673 while (len > 0) {
1d4dfdef
DG
1674 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1675 (unsigned long)offset, len, fd, splice_pipe[1]);
fb3a43a9 1676 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1677 SPLICE_F_MOVE | SPLICE_F_MORE);
1678 DBG("splice chan to pipe, ret %zd", ret_splice);
1679 if (ret_splice < 0) {
1680 PERROR("Error in relay splice");
1681 if (written == 0) {
1682 written = ret_splice;
1683 }
1684 ret = errno;
1685 goto splice_error;
1686 }
1687
1688 /* Handle stream on the relayd if the output is on the network */
1689 if (relayd) {
1690 if (stream->metadata_flag) {
1d4dfdef
DG
1691 size_t metadata_payload_size =
1692 sizeof(struct lttcomm_relayd_metadata_payload);
1693
f02e1e8a 1694 /* Update counter to fit the spliced data */
1d4dfdef
DG
1695 ret_splice += metadata_payload_size;
1696 len += metadata_payload_size;
f02e1e8a
DG
1697 /*
1698 * We do this so the return value can match the len passed as
1699 * argument to this function.
1700 */
1d4dfdef 1701 written -= metadata_payload_size;
f02e1e8a
DG
1702 }
1703 }
1704
1705 /* Splice data out */
fb3a43a9 1706 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a 1707 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1d4dfdef 1708 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
f02e1e8a
DG
1709 if (ret_splice < 0) {
1710 PERROR("Error in file splice");
1711 if (written == 0) {
1712 written = ret_splice;
1713 }
8994307f 1714 /* Socket operation failed. We consider the relayd dead */
00c8752b 1715 if (errno == EBADF || errno == EPIPE) {
8994307f
DG
1716 WARN("Remote relayd disconnected. Stopping");
1717 relayd_hang_up = 1;
1718 goto write_error;
1719 }
f02e1e8a
DG
1720 ret = errno;
1721 goto splice_error;
1722 } else if (ret_splice > len) {
1723 errno = EINVAL;
1724 PERROR("Wrote more data than requested %zd (len: %lu)",
1725 ret_splice, len);
1726 written += ret_splice;
1727 ret = errno;
1728 goto splice_error;
1729 }
1730 len -= ret_splice;
1731
1732 /* This call is useless on a socket so better save a syscall. */
1733 if (!relayd) {
1734 /* This won't block, but will start writeout asynchronously */
1735 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1736 SYNC_FILE_RANGE_WRITE);
1737 stream->out_fd_offset += ret_splice;
1738 }
e5d1a9b3 1739 stream->output_written += ret_splice;
f02e1e8a
DG
1740 written += ret_splice;
1741 }
1742 lttng_consumer_sync_trace_file(stream, orig_offset);
1743
1744 ret = ret_splice;
1745
1746 goto end;
1747
8994307f
DG
1748write_error:
1749 /*
1750 * This is a special case that the relayd has closed its socket. Let's
1751 * cleanup the relayd object and all associated streams.
1752 */
1753 if (relayd && relayd_hang_up) {
1754 cleanup_relayd(relayd, ctx);
1755 /* Skip splice error so the consumer does not fail */
1756 goto end;
1757 }
1758
f02e1e8a
DG
1759splice_error:
1760 /* send the appropriate error description to sessiond */
1761 switch (ret) {
f02e1e8a 1762 case EINVAL:
f73fabfd 1763 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1764 break;
1765 case ENOMEM:
f73fabfd 1766 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1767 break;
1768 case ESPIPE:
f73fabfd 1769 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1770 break;
1771 }
1772
1773end:
1774 if (relayd && stream->metadata_flag) {
1775 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1776 }
1777
1778 rcu_read_unlock();
1779 return written;
3bd1e081
MD
1780}
1781
1782/*
1783 * Take a snapshot for a specific fd
1784 *
1785 * Returns 0 on success, < 0 on error
1786 */
ffe60014 1787int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
1788{
1789 switch (consumer_data.type) {
1790 case LTTNG_CONSUMER_KERNEL:
ffe60014 1791 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1792 case LTTNG_CONSUMER32_UST:
1793 case LTTNG_CONSUMER64_UST:
ffe60014 1794 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1795 default:
1796 ERR("Unknown consumer_data type");
1797 assert(0);
1798 return -ENOSYS;
1799 }
3bd1e081
MD
1800}
1801
1802/*
1803 * Get the produced position
1804 *
1805 * Returns 0 on success, < 0 on error
1806 */
ffe60014 1807int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
1808 unsigned long *pos)
1809{
1810 switch (consumer_data.type) {
1811 case LTTNG_CONSUMER_KERNEL:
ffe60014 1812 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
1813 case LTTNG_CONSUMER32_UST:
1814 case LTTNG_CONSUMER64_UST:
ffe60014 1815 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
1816 default:
1817 ERR("Unknown consumer_data type");
1818 assert(0);
1819 return -ENOSYS;
1820 }
1821}
1822
1823int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1824 int sock, struct pollfd *consumer_sockpoll)
1825{
1826 switch (consumer_data.type) {
1827 case LTTNG_CONSUMER_KERNEL:
1828 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1829 case LTTNG_CONSUMER32_UST:
1830 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1831 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1832 default:
1833 ERR("Unknown consumer_data type");
1834 assert(0);
1835 return -ENOSYS;
1836 }
1837}
1838
43c34bc3
DG
1839/*
1840 * Iterate over all streams of the hashtable and free them properly.
1841 *
1842 * WARNING: *MUST* be used with data stream only.
1843 */
1844static void destroy_data_stream_ht(struct lttng_ht *ht)
1845{
43c34bc3
DG
1846 struct lttng_ht_iter iter;
1847 struct lttng_consumer_stream *stream;
1848
1849 if (ht == NULL) {
1850 return;
1851 }
1852
1853 rcu_read_lock();
1854 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1855 /*
1856 * Ignore return value since we are currently cleaning up so any error
1857 * can't be handled.
1858 */
1859 (void) consumer_del_stream(stream, ht);
43c34bc3
DG
1860 }
1861 rcu_read_unlock();
1862
1863 lttng_ht_destroy(ht);
1864}
1865
fb3a43a9 1866/*
f724d81e 1867 * Iterate over all streams of the hashtable and free them properly.
e316aad5
DG
1868 *
1869 * XXX: Should not be only for metadata stream or else use an other name.
fb3a43a9
DG
1870 */
1871static void destroy_stream_ht(struct lttng_ht *ht)
1872{
fb3a43a9
DG
1873 struct lttng_ht_iter iter;
1874 struct lttng_consumer_stream *stream;
1875
1876 if (ht == NULL) {
1877 return;
1878 }
1879
d09e1200 1880 rcu_read_lock();
58b1f425 1881 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1882 /*
1883 * Ignore return value since we are currently cleaning up so any error
1884 * can't be handled.
1885 */
1886 (void) consumer_del_metadata_stream(stream, ht);
fb3a43a9 1887 }
d09e1200 1888 rcu_read_unlock();
fb3a43a9
DG
1889
1890 lttng_ht_destroy(ht);
1891}
1892
d88aee68
DG
1893void lttng_consumer_close_metadata(void)
1894{
1895 switch (consumer_data.type) {
1896 case LTTNG_CONSUMER_KERNEL:
1897 /*
1898 * The Kernel consumer has a different metadata scheme so we don't
1899 * close anything because the stream will be closed by the session
1900 * daemon.
1901 */
1902 break;
1903 case LTTNG_CONSUMER32_UST:
1904 case LTTNG_CONSUMER64_UST:
1905 /*
1906 * Close all metadata streams. The metadata hash table is passed and
1907 * this call iterates over it by closing all wakeup fd. This is safe
1908 * because at this point we are sure that the metadata producer is
1909 * either dead or blocked.
1910 */
1911 lttng_ustconsumer_close_metadata(metadata_ht);
1912 break;
1913 default:
1914 ERR("Unknown consumer_data type");
1915 assert(0);
1916 }
1917}
1918
fb3a43a9
DG
1919/*
1920 * Clean up a metadata stream and free its memory.
1921 */
e316aad5
DG
1922void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1923 struct lttng_ht *ht)
fb3a43a9
DG
1924{
1925 int ret;
e316aad5
DG
1926 struct lttng_ht_iter iter;
1927 struct lttng_consumer_channel *free_chan = NULL;
fb3a43a9
DG
1928 struct consumer_relayd_sock_pair *relayd;
1929
1930 assert(stream);
1931 /*
1932 * This call should NEVER receive regular stream. It must always be
1933 * metadata stream and this is crucial for data structure synchronization.
1934 */
1935 assert(stream->metadata_flag);
1936
e316aad5
DG
1937 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1938
1939 if (ht == NULL) {
1940 /* Means the stream was allocated but not successfully added */
ffe60014 1941 goto free_stream_rcu;
e316aad5
DG
1942 }
1943
74251bb8 1944 pthread_mutex_lock(&consumer_data.lock);
a9838785 1945 pthread_mutex_lock(&stream->chan->lock);
8994307f
DG
1946 pthread_mutex_lock(&stream->lock);
1947
fb3a43a9
DG
1948 switch (consumer_data.type) {
1949 case LTTNG_CONSUMER_KERNEL:
1950 if (stream->mmap_base != NULL) {
1951 ret = munmap(stream->mmap_base, stream->mmap_len);
1952 if (ret != 0) {
1953 PERROR("munmap metadata stream");
1954 }
1955 }
4c95e622
JD
1956 if (stream->wait_fd >= 0) {
1957 ret = close(stream->wait_fd);
1958 if (ret < 0) {
1959 PERROR("close kernel metadata wait_fd");
1960 }
1961 }
fb3a43a9
DG
1962 break;
1963 case LTTNG_CONSUMER32_UST:
1964 case LTTNG_CONSUMER64_UST:
04ef1097
MD
1965 if (stream->monitor) {
1966 /* close the write-side in close_metadata */
1967 ret = close(stream->ust_metadata_poll_pipe[0]);
1968 if (ret < 0) {
1969 PERROR("Close UST metadata read-side poll pipe");
1970 }
1971 }
fb3a43a9
DG
1972 lttng_ustconsumer_del_stream(stream);
1973 break;
1974 default:
1975 ERR("Unknown consumer_data type");
1976 assert(0);
e316aad5 1977 goto end;
fb3a43a9 1978 }
fb3a43a9 1979
c869f647 1980 rcu_read_lock();
58b1f425 1981 iter.iter.node = &stream->node.node;
c869f647
DG
1982 ret = lttng_ht_del(ht, &iter);
1983 assert(!ret);
ca22feea 1984
d8ef542d
MD
1985 iter.iter.node = &stream->node_channel_id.node;
1986 ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
1987 assert(!ret);
1988
ca22feea
DG
1989 iter.iter.node = &stream->node_session_id.node;
1990 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1991 assert(!ret);
c869f647
DG
1992 rcu_read_unlock();
1993
fb3a43a9
DG
1994 if (stream->out_fd >= 0) {
1995 ret = close(stream->out_fd);
1996 if (ret) {
1997 PERROR("close");
1998 }
1999 }
2000
fb3a43a9
DG
2001 /* Check and cleanup relayd */
2002 rcu_read_lock();
2003 relayd = consumer_find_relayd(stream->net_seq_idx);
2004 if (relayd != NULL) {
2005 uatomic_dec(&relayd->refcount);
2006 assert(uatomic_read(&relayd->refcount) >= 0);
2007
2008 /* Closing streams requires to lock the control socket. */
2009 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2010 ret = relayd_send_close_stream(&relayd->control_sock,
2011 stream->relayd_stream_id, stream->next_net_seq_num - 1);
2012 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2013 if (ret < 0) {
2014 DBG("Unable to close stream on the relayd. Continuing");
2015 /*
2016 * Continue here. There is nothing we can do for the relayd.
2017 * Chances are that the relayd has closed the socket so we just
2018 * continue cleaning up.
2019 */
2020 }
2021
2022 /* Both conditions are met, we destroy the relayd. */
2023 if (uatomic_read(&relayd->refcount) == 0 &&
2024 uatomic_read(&relayd->destroy_flag)) {
51230d70 2025 consumer_destroy_relayd(relayd);
fb3a43a9
DG
2026 }
2027 }
2028 rcu_read_unlock();
2029
2030 /* Atomically decrement channel refcount since other threads can use it. */
f2ad556d 2031 if (!uatomic_sub_return(&stream->chan->refcount, 1)
ffe60014 2032 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
c30aaa51 2033 /* Go for channel deletion! */
e316aad5 2034 free_chan = stream->chan;
fb3a43a9
DG
2035 }
2036
e316aad5 2037end:
73811ecc
DG
2038 /*
2039 * Nullify the stream reference so it is not used after deletion. The
5e41ebe1
MD
2040 * channel lock MUST be acquired before being able to check for
2041 * a NULL pointer value.
73811ecc
DG
2042 */
2043 stream->chan->metadata_stream = NULL;
2044
8994307f 2045 pthread_mutex_unlock(&stream->lock);
a9838785 2046 pthread_mutex_unlock(&stream->chan->lock);
74251bb8 2047 pthread_mutex_unlock(&consumer_data.lock);
e316aad5
DG
2048
2049 if (free_chan) {
2050 consumer_del_channel(free_chan);
2051 }
2052
ffe60014
DG
2053free_stream_rcu:
2054 call_rcu(&stream->node.head, free_stream_rcu);
fb3a43a9
DG
2055}
2056
2057/*
2058 * Action done with the metadata stream when adding it to the consumer internal
2059 * data structures to handle it.
2060 */
5ab66908 2061int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2062{
5ab66908 2063 struct lttng_ht *ht = metadata_ht;
e316aad5 2064 int ret = 0;
76082088 2065 struct lttng_ht_iter iter;
d88aee68 2066 struct lttng_ht_node_u64 *node;
fb3a43a9 2067
e316aad5
DG
2068 assert(stream);
2069 assert(ht);
2070
d88aee68 2071 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5
DG
2072
2073 pthread_mutex_lock(&consumer_data.lock);
a9838785 2074 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2075 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2076 pthread_mutex_lock(&stream->lock);
e316aad5 2077
e316aad5
DG
2078 /*
2079 * From here, refcounts are updated so be _careful_ when returning an error
2080 * after this point.
2081 */
2082
fb3a43a9 2083 rcu_read_lock();
76082088
DG
2084
2085 /*
2086 * Lookup the stream just to make sure it does not exist in our internal
2087 * state. This should NEVER happen.
2088 */
d88aee68
DG
2089 lttng_ht_lookup(ht, &stream->key, &iter);
2090 node = lttng_ht_iter_get_node_u64(&iter);
76082088
DG
2091 assert(!node);
2092
e316aad5 2093 /*
ffe60014
DG
2094 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2095 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2096 * causes the count to become 0 also causes a stream to be added. The
2097 * channel deletion will thus be triggered by the following removal of this
2098 * stream.
2099 */
ffe60014 2100 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2101 /* Increment refcount before decrementing nb_init_stream_left */
2102 cmm_smp_wmb();
ffe60014 2103 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2104 }
2105
d88aee68 2106 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2107
d8ef542d
MD
2108 lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
2109 &stream->node_channel_id);
2110
ca22feea
DG
2111 /*
2112 * Add stream to the stream_list_ht of the consumer data. No need to steal
2113 * the key since the HT does not use it and we allow to add redundant keys
2114 * into this table.
2115 */
d88aee68 2116 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2117
fb3a43a9 2118 rcu_read_unlock();
e316aad5 2119
2e818a6a 2120 pthread_mutex_unlock(&stream->lock);
a9838785 2121 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2122 pthread_mutex_unlock(&stream->chan->timer_lock);
e316aad5
DG
2123 pthread_mutex_unlock(&consumer_data.lock);
2124 return ret;
fb3a43a9
DG
2125}
2126
8994307f
DG
2127/*
2128 * Delete data stream that are flagged for deletion (endpoint_status).
2129 */
2130static void validate_endpoint_status_data_stream(void)
2131{
2132 struct lttng_ht_iter iter;
2133 struct lttng_consumer_stream *stream;
2134
2135 DBG("Consumer delete flagged data stream");
2136
2137 rcu_read_lock();
2138 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2139 /* Validate delete flag of the stream */
79d4ffb7 2140 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2141 continue;
2142 }
2143 /* Delete it right now */
2144 consumer_del_stream(stream, data_ht);
2145 }
2146 rcu_read_unlock();
2147}
2148
2149/*
2150 * Delete metadata stream that are flagged for deletion (endpoint_status).
2151 */
2152static void validate_endpoint_status_metadata_stream(
2153 struct lttng_poll_event *pollset)
2154{
2155 struct lttng_ht_iter iter;
2156 struct lttng_consumer_stream *stream;
2157
2158 DBG("Consumer delete flagged metadata stream");
2159
2160 assert(pollset);
2161
2162 rcu_read_lock();
2163 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2164 /* Validate delete flag of the stream */
79d4ffb7 2165 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2166 continue;
2167 }
2168 /*
2169 * Remove from pollset so the metadata thread can continue without
2170 * blocking on a deleted stream.
2171 */
2172 lttng_poll_del(pollset, stream->wait_fd);
2173
2174 /* Delete it right now */
2175 consumer_del_metadata_stream(stream, metadata_ht);
2176 }
2177 rcu_read_unlock();
2178}
2179
fb3a43a9
DG
2180/*
2181 * Thread polls on metadata file descriptor and write them on disk or on the
2182 * network.
2183 */
7d980def 2184void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2185{
1fc79fb4 2186 int ret, i, pollfd, err = -1;
fb3a43a9 2187 uint32_t revents, nb_fd;
e316aad5 2188 struct lttng_consumer_stream *stream = NULL;
fb3a43a9 2189 struct lttng_ht_iter iter;
d88aee68 2190 struct lttng_ht_node_u64 *node;
fb3a43a9
DG
2191 struct lttng_poll_event events;
2192 struct lttng_consumer_local_data *ctx = data;
2193 ssize_t len;
2194
2195 rcu_register_thread();
2196
1fc79fb4
MD
2197 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2198
d88aee68 2199 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
04bb2b64
DG
2200 if (!metadata_ht) {
2201 /* ENOMEM at this point. Better to bail out. */
d8ef542d 2202 goto end_ht;
04bb2b64
DG
2203 }
2204
fb3a43a9
DG
2205 DBG("Thread metadata poll started");
2206
fb3a43a9
DG
2207 /* Size is set to 1 for the consumer_metadata pipe */
2208 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2209 if (ret < 0) {
2210 ERR("Poll set creation failed");
d8ef542d 2211 goto end_poll;
fb3a43a9
DG
2212 }
2213
13886d2d
DG
2214 ret = lttng_poll_add(&events,
2215 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2216 if (ret < 0) {
2217 goto end;
2218 }
2219
2220 /* Main loop */
2221 DBG("Metadata main loop started");
2222
2223 while (1) {
fb3a43a9 2224 /* Only the metadata pipe is set */
d21b0d71 2225 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
1fc79fb4 2226 err = 0; /* All is OK */
fb3a43a9
DG
2227 goto end;
2228 }
2229
2230restart:
d21b0d71 2231 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
fb3a43a9
DG
2232 ret = lttng_poll_wait(&events, -1);
2233 DBG("Metadata event catched in thread");
2234 if (ret < 0) {
2235 if (errno == EINTR) {
e316aad5 2236 ERR("Poll EINTR catched");
fb3a43a9
DG
2237 goto restart;
2238 }
2239 goto error;
2240 }
2241
0d9c5d77
DG
2242 nb_fd = ret;
2243
e316aad5 2244 /* From here, the event is a metadata wait fd */
fb3a43a9
DG
2245 for (i = 0; i < nb_fd; i++) {
2246 revents = LTTNG_POLL_GETEV(&events, i);
2247 pollfd = LTTNG_POLL_GETFD(&events, i);
2248
13886d2d 2249 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
4adabd61 2250 if (revents & (LPOLLERR | LPOLLHUP )) {
fb3a43a9
DG
2251 DBG("Metadata thread pipe hung up");
2252 /*
2253 * Remove the pipe from the poll set and continue the loop
2254 * since their might be data to consume.
2255 */
13886d2d
DG
2256 lttng_poll_del(&events,
2257 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2258 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2259 continue;
2260 } else if (revents & LPOLLIN) {
13886d2d
DG
2261 ssize_t pipe_len;
2262
2263 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2264 &stream, sizeof(stream));
2265 if (pipe_len < 0) {
6a00837f 2266 ERR("read metadata stream, ret: %zd", pipe_len);
fb3a43a9 2267 /*
13886d2d 2268 * Continue here to handle the rest of the streams.
fb3a43a9
DG
2269 */
2270 continue;
2271 }
2272
8994307f
DG
2273 /* A NULL stream means that the state has changed. */
2274 if (stream == NULL) {
2275 /* Check for deleted streams. */
2276 validate_endpoint_status_metadata_stream(&events);
3714380f 2277 goto restart;
8994307f
DG
2278 }
2279
fb3a43a9
DG
2280 DBG("Adding metadata stream %d to poll set",
2281 stream->wait_fd);
2282
fb3a43a9
DG
2283 /* Add metadata stream to the global poll events list */
2284 lttng_poll_add(&events, stream->wait_fd,
2285 LPOLLIN | LPOLLPRI);
fb3a43a9
DG
2286 }
2287
e316aad5 2288 /* Handle other stream */
fb3a43a9
DG
2289 continue;
2290 }
2291
d09e1200 2292 rcu_read_lock();
d88aee68
DG
2293 {
2294 uint64_t tmp_id = (uint64_t) pollfd;
2295
2296 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2297 }
2298 node = lttng_ht_iter_get_node_u64(&iter);
e316aad5 2299 assert(node);
fb3a43a9
DG
2300
2301 stream = caa_container_of(node, struct lttng_consumer_stream,
58b1f425 2302 node);
fb3a43a9 2303
e316aad5 2304 /* Check for error event */
4adabd61 2305 if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2306 DBG("Metadata fd %d is hup|err.", pollfd);
fb3a43a9
DG
2307 if (!stream->hangup_flush_done
2308 && (consumer_data.type == LTTNG_CONSUMER32_UST
2309 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2310 DBG("Attempting to flush and consume the UST buffers");
2311 lttng_ustconsumer_on_stream_hangup(stream);
2312
2313 /* We just flushed the stream now read it. */
4bb94b75
DG
2314 do {
2315 len = ctx->on_buffer_ready(stream, ctx);
2316 /*
2317 * We don't check the return value here since if we get
2318 * a negative len, it means an error occured thus we
2319 * simply remove it from the poll set and free the
2320 * stream.
2321 */
2322 } while (len > 0);
fb3a43a9
DG
2323 }
2324
fb3a43a9 2325 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2326 /*
2327 * This call update the channel states, closes file descriptors
2328 * and securely free the stream.
2329 */
2330 consumer_del_metadata_stream(stream, metadata_ht);
2331 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2332 /* Get the data out of the metadata file descriptor */
2333 DBG("Metadata available on fd %d", pollfd);
2334 assert(stream->wait_fd == pollfd);
2335
04ef1097
MD
2336 do {
2337 len = ctx->on_buffer_ready(stream, ctx);
2338 /*
2339 * We don't check the return value here since if we get
2340 * a negative len, it means an error occured thus we
2341 * simply remove it from the poll set and free the
2342 * stream.
2343 */
2344 } while (len > 0);
2345
e316aad5 2346 /* It's ok to have an unavailable sub-buffer */
b64403e3 2347 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2348 /* Clean up stream from consumer and free it. */
2349 lttng_poll_del(&events, stream->wait_fd);
2350 consumer_del_metadata_stream(stream, metadata_ht);
e316aad5 2351 }
fb3a43a9 2352 }
e316aad5
DG
2353
2354 /* Release RCU lock for the stream looked up */
d09e1200 2355 rcu_read_unlock();
fb3a43a9
DG
2356 }
2357 }
2358
1fc79fb4
MD
2359 /* All is OK */
2360 err = 0;
fb3a43a9
DG
2361error:
2362end:
2363 DBG("Metadata poll thread exiting");
fb3a43a9 2364
d8ef542d
MD
2365 lttng_poll_clean(&events);
2366end_poll:
04bb2b64 2367 destroy_stream_ht(metadata_ht);
d8ef542d 2368end_ht:
1fc79fb4
MD
2369 if (err) {
2370 health_error();
2371 ERR("Health error occurred in %s", __func__);
2372 }
2373 health_unregister(health_consumerd);
fb3a43a9
DG
2374 rcu_unregister_thread();
2375 return NULL;
2376}
2377
3bd1e081 2378/*
e4421fec 2379 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2380 * it to tracefile if necessary.
2381 */
7d980def 2382void *consumer_thread_data_poll(void *data)
3bd1e081 2383{
1fc79fb4 2384 int num_rdy, num_hup, high_prio, ret, i, err = -1;
3bd1e081
MD
2385 struct pollfd *pollfd = NULL;
2386 /* local view of the streams */
c869f647 2387 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
3bd1e081
MD
2388 /* local view of consumer_data.fds_count */
2389 int nb_fd = 0;
3bd1e081 2390 struct lttng_consumer_local_data *ctx = data;
00e2e675 2391 ssize_t len;
3bd1e081 2392
e7b994a3
DG
2393 rcu_register_thread();
2394
1fc79fb4
MD
2395 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2396
d88aee68 2397 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
43c34bc3 2398 if (data_ht == NULL) {
04bb2b64 2399 /* ENOMEM at this point. Better to bail out. */
43c34bc3
DG
2400 goto end;
2401 }
2402
4df6c8cb
MD
2403 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2404 if (local_stream == NULL) {
2405 PERROR("local_stream malloc");
2406 goto end;
2407 }
3bd1e081
MD
2408
2409 while (1) {
2410 high_prio = 0;
2411 num_hup = 0;
2412
2413 /*
e4421fec 2414 * the fds set has been updated, we need to update our
3bd1e081
MD
2415 * local array as well
2416 */
2417 pthread_mutex_lock(&consumer_data.lock);
2418 if (consumer_data.need_update) {
0e428499
DG
2419 free(pollfd);
2420 pollfd = NULL;
2421
2422 free(local_stream);
2423 local_stream = NULL;
3bd1e081 2424
50f8ae69 2425 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2426 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081 2427 if (pollfd == NULL) {
7a57cf92 2428 PERROR("pollfd malloc");
3bd1e081
MD
2429 pthread_mutex_unlock(&consumer_data.lock);
2430 goto end;
2431 }
2432
50f8ae69 2433 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2434 local_stream = zmalloc((consumer_data.stream_count + 1) *
747f8642 2435 sizeof(struct lttng_consumer_stream *));
3bd1e081 2436 if (local_stream == NULL) {
7a57cf92 2437 PERROR("local_stream malloc");
3bd1e081
MD
2438 pthread_mutex_unlock(&consumer_data.lock);
2439 goto end;
2440 }
ffe60014 2441 ret = update_poll_array(ctx, &pollfd, local_stream,
43c34bc3 2442 data_ht);
3bd1e081
MD
2443 if (ret < 0) {
2444 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2445 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2446 pthread_mutex_unlock(&consumer_data.lock);
2447 goto end;
2448 }
2449 nb_fd = ret;
2450 consumer_data.need_update = 0;
2451 }
2452 pthread_mutex_unlock(&consumer_data.lock);
2453
4078b776
MD
2454 /* No FDs and consumer_quit, consumer_cleanup the thread */
2455 if (nb_fd == 0 && consumer_quit == 1) {
1fc79fb4 2456 err = 0; /* All is OK */
4078b776
MD
2457 goto end;
2458 }
3bd1e081 2459 /* poll on the array of fds */
88f2b785 2460 restart:
3bd1e081 2461 DBG("polling on %d fd", nb_fd + 1);
cb365c03 2462 num_rdy = poll(pollfd, nb_fd + 1, -1);
3bd1e081
MD
2463 DBG("poll num_rdy : %d", num_rdy);
2464 if (num_rdy == -1) {
88f2b785
MD
2465 /*
2466 * Restart interrupted system call.
2467 */
2468 if (errno == EINTR) {
2469 goto restart;
2470 }
7a57cf92 2471 PERROR("Poll error");
f73fabfd 2472 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2473 goto end;
2474 } else if (num_rdy == 0) {
2475 DBG("Polling thread timed out");
2476 goto end;
2477 }
2478
3bd1e081 2479 /*
50f8ae69 2480 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2481 * beginning of the loop to update the array. We want to prioritize
2482 * array update over low-priority reads.
3bd1e081 2483 */
509bb1cf 2484 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2485 ssize_t pipe_readlen;
04fdd819 2486
50f8ae69 2487 DBG("consumer_data_pipe wake up");
acdb9057
DG
2488 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2489 &new_stream, sizeof(new_stream));
23f5f35d 2490 if (pipe_readlen < 0) {
6a00837f 2491 ERR("Consumer data pipe ret %zd", pipe_readlen);
23f5f35d
DG
2492 /* Continue so we can at least handle the current stream(s). */
2493 continue;
2494 }
c869f647
DG
2495
2496 /*
2497 * If the stream is NULL, just ignore it. It's also possible that
2498 * the sessiond poll thread changed the consumer_quit state and is
2499 * waking us up to test it.
2500 */
2501 if (new_stream == NULL) {
8994307f 2502 validate_endpoint_status_data_stream();
c869f647
DG
2503 continue;
2504 }
2505
c869f647 2506 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2507 continue;
2508 }
2509
2510 /* Take care of high priority channels first. */
2511 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2512 if (local_stream[i] == NULL) {
2513 continue;
2514 }
fb3a43a9 2515 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2516 DBG("Urgent read on fd %d", pollfd[i].fd);
2517 high_prio = 1;
4078b776 2518 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 2519 /* it's ok to have an unavailable sub-buffer */
b64403e3 2520 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2521 /* Clean the stream and free it. */
2522 consumer_del_stream(local_stream[i], data_ht);
9617607b 2523 local_stream[i] = NULL;
4078b776
MD
2524 } else if (len > 0) {
2525 local_stream[i]->data_read = 1;
d41f73b7 2526 }
3bd1e081
MD
2527 }
2528 }
2529
4078b776
MD
2530 /*
2531 * If we read high prio channel in this loop, try again
2532 * for more high prio data.
2533 */
2534 if (high_prio) {
3bd1e081
MD
2535 continue;
2536 }
2537
2538 /* Take care of low priority channels. */
4078b776 2539 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2540 if (local_stream[i] == NULL) {
2541 continue;
2542 }
4078b776
MD
2543 if ((pollfd[i].revents & POLLIN) ||
2544 local_stream[i]->hangup_flush_done) {
4078b776
MD
2545 DBG("Normal read on fd %d", pollfd[i].fd);
2546 len = ctx->on_buffer_ready(local_stream[i], ctx);
2547 /* it's ok to have an unavailable sub-buffer */
b64403e3 2548 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2549 /* Clean the stream and free it. */
2550 consumer_del_stream(local_stream[i], data_ht);
9617607b 2551 local_stream[i] = NULL;
4078b776
MD
2552 } else if (len > 0) {
2553 local_stream[i]->data_read = 1;
2554 }
2555 }
2556 }
2557
2558 /* Handle hangup and errors */
2559 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2560 if (local_stream[i] == NULL) {
2561 continue;
2562 }
4078b776
MD
2563 if (!local_stream[i]->hangup_flush_done
2564 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2565 && (consumer_data.type == LTTNG_CONSUMER32_UST
2566 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2567 DBG("fd %d is hup|err|nval. Attempting flush and read.",
9617607b 2568 pollfd[i].fd);
4078b776
MD
2569 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2570 /* Attempt read again, for the data we just flushed. */
2571 local_stream[i]->data_read = 1;
2572 }
2573 /*
2574 * If the poll flag is HUP/ERR/NVAL and we have
2575 * read no data in this pass, we can remove the
2576 * stream from its hash table.
2577 */
2578 if ((pollfd[i].revents & POLLHUP)) {
2579 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2580 if (!local_stream[i]->data_read) {
43c34bc3 2581 consumer_del_stream(local_stream[i], data_ht);
9617607b 2582 local_stream[i] = NULL;
4078b776
MD
2583 num_hup++;
2584 }
2585 } else if (pollfd[i].revents & POLLERR) {
2586 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2587 if (!local_stream[i]->data_read) {
43c34bc3 2588 consumer_del_stream(local_stream[i], data_ht);
9617607b 2589 local_stream[i] = NULL;
4078b776
MD
2590 num_hup++;
2591 }
2592 } else if (pollfd[i].revents & POLLNVAL) {
2593 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2594 if (!local_stream[i]->data_read) {
43c34bc3 2595 consumer_del_stream(local_stream[i], data_ht);
9617607b 2596 local_stream[i] = NULL;
4078b776 2597 num_hup++;
3bd1e081
MD
2598 }
2599 }
9617607b
DG
2600 if (local_stream[i] != NULL) {
2601 local_stream[i]->data_read = 0;
2602 }
3bd1e081
MD
2603 }
2604 }
1fc79fb4
MD
2605 /* All is OK */
2606 err = 0;
3bd1e081
MD
2607end:
2608 DBG("polling thread exiting");
0e428499
DG
2609 free(pollfd);
2610 free(local_stream);
fb3a43a9
DG
2611
2612 /*
2613 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2614 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2615 * read side of the pipe. If we close them both, epoll_wait strangely does
2616 * not return and could create a endless wait period if the pipe is the
2617 * only tracked fd in the poll set. The thread will take care of closing
2618 * the read side.
fb3a43a9 2619 */
13886d2d 2620 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2621
04bb2b64 2622 destroy_data_stream_ht(data_ht);
43c34bc3 2623
1fc79fb4
MD
2624 if (err) {
2625 health_error();
2626 ERR("Health error occurred in %s", __func__);
2627 }
2628 health_unregister(health_consumerd);
2629
e7b994a3 2630 rcu_unregister_thread();
3bd1e081
MD
2631 return NULL;
2632}
2633
d8ef542d
MD
2634/*
2635 * Close wake-up end of each stream belonging to the channel. This will
2636 * allow the poll() on the stream read-side to detect when the
2637 * write-side (application) finally closes them.
2638 */
2639static
2640void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2641{
2642 struct lttng_ht *ht;
2643 struct lttng_consumer_stream *stream;
2644 struct lttng_ht_iter iter;
2645
2646 ht = consumer_data.stream_per_chan_id_ht;
2647
2648 rcu_read_lock();
2649 cds_lfht_for_each_entry_duplicate(ht->ht,
2650 ht->hash_fct(&channel->key, lttng_ht_seed),
2651 ht->match_fct, &channel->key,
2652 &iter.iter, stream, node_channel_id.node) {
f2ad556d
MD
2653 /*
2654 * Protect against teardown with mutex.
2655 */
2656 pthread_mutex_lock(&stream->lock);
2657 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2658 goto next;
2659 }
d8ef542d
MD
2660 switch (consumer_data.type) {
2661 case LTTNG_CONSUMER_KERNEL:
2662 break;
2663 case LTTNG_CONSUMER32_UST:
2664 case LTTNG_CONSUMER64_UST:
2665 /*
2666 * Note: a mutex is taken internally within
2667 * liblttng-ust-ctl to protect timer wakeup_fd
2668 * use from concurrent close.
2669 */
2670 lttng_ustconsumer_close_stream_wakeup(stream);
2671 break;
2672 default:
2673 ERR("Unknown consumer_data type");
2674 assert(0);
2675 }
f2ad556d
MD
2676 next:
2677 pthread_mutex_unlock(&stream->lock);
d8ef542d
MD
2678 }
2679 rcu_read_unlock();
2680}
2681
2682static void destroy_channel_ht(struct lttng_ht *ht)
2683{
2684 struct lttng_ht_iter iter;
2685 struct lttng_consumer_channel *channel;
2686 int ret;
2687
2688 if (ht == NULL) {
2689 return;
2690 }
2691
2692 rcu_read_lock();
2693 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2694 ret = lttng_ht_del(ht, &iter);
2695 assert(ret != 0);
2696 }
2697 rcu_read_unlock();
2698
2699 lttng_ht_destroy(ht);
2700}
2701
2702/*
2703 * This thread polls the channel fds to detect when they are being
2704 * closed. It closes all related streams if the channel is detected as
2705 * closed. It is currently only used as a shim layer for UST because the
2706 * consumerd needs to keep the per-stream wakeup end of pipes open for
2707 * periodical flush.
2708 */
2709void *consumer_thread_channel_poll(void *data)
2710{
1fc79fb4 2711 int ret, i, pollfd, err = -1;
d8ef542d
MD
2712 uint32_t revents, nb_fd;
2713 struct lttng_consumer_channel *chan = NULL;
2714 struct lttng_ht_iter iter;
2715 struct lttng_ht_node_u64 *node;
2716 struct lttng_poll_event events;
2717 struct lttng_consumer_local_data *ctx = data;
2718 struct lttng_ht *channel_ht;
2719
2720 rcu_register_thread();
2721
1fc79fb4
MD
2722 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2723
d8ef542d
MD
2724 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2725 if (!channel_ht) {
2726 /* ENOMEM at this point. Better to bail out. */
2727 goto end_ht;
2728 }
2729
2730 DBG("Thread channel poll started");
2731
2732 /* Size is set to 1 for the consumer_channel pipe */
2733 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2734 if (ret < 0) {
2735 ERR("Poll set creation failed");
2736 goto end_poll;
2737 }
2738
2739 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2740 if (ret < 0) {
2741 goto end;
2742 }
2743
2744 /* Main loop */
2745 DBG("Channel main loop started");
2746
2747 while (1) {
2748 /* Only the channel pipe is set */
2749 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
1fc79fb4 2750 err = 0; /* All is OK */
d8ef542d
MD
2751 goto end;
2752 }
2753
2754restart:
2755 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2756 ret = lttng_poll_wait(&events, -1);
2757 DBG("Channel event catched in thread");
2758 if (ret < 0) {
2759 if (errno == EINTR) {
2760 ERR("Poll EINTR catched");
2761 goto restart;
2762 }
2763 goto end;
2764 }
2765
2766 nb_fd = ret;
2767
2768 /* From here, the event is a channel wait fd */
2769 for (i = 0; i < nb_fd; i++) {
2770 revents = LTTNG_POLL_GETEV(&events, i);
2771 pollfd = LTTNG_POLL_GETFD(&events, i);
2772
2773 /* Just don't waste time if no returned events for the fd */
2774 if (!revents) {
2775 continue;
2776 }
2777 if (pollfd == ctx->consumer_channel_pipe[0]) {
2778 if (revents & (LPOLLERR | LPOLLHUP)) {
2779 DBG("Channel thread pipe hung up");
2780 /*
2781 * Remove the pipe from the poll set and continue the loop
2782 * since their might be data to consume.
2783 */
2784 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2785 continue;
2786 } else if (revents & LPOLLIN) {
2787 enum consumer_channel_action action;
a0cbdd2e 2788 uint64_t key;
d8ef542d 2789
a0cbdd2e 2790 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d
MD
2791 if (ret <= 0) {
2792 ERR("Error reading channel pipe");
2793 continue;
2794 }
2795
2796 switch (action) {
2797 case CONSUMER_CHANNEL_ADD:
2798 DBG("Adding channel %d to poll set",
2799 chan->wait_fd);
2800
2801 lttng_ht_node_init_u64(&chan->wait_fd_node,
2802 chan->wait_fd);
c7260a81 2803 rcu_read_lock();
d8ef542d
MD
2804 lttng_ht_add_unique_u64(channel_ht,
2805 &chan->wait_fd_node);
c7260a81 2806 rcu_read_unlock();
d8ef542d
MD
2807 /* Add channel to the global poll events list */
2808 lttng_poll_add(&events, chan->wait_fd,
2809 LPOLLIN | LPOLLPRI);
2810 break;
a0cbdd2e
MD
2811 case CONSUMER_CHANNEL_DEL:
2812 {
f2a444f1
DG
2813 struct lttng_consumer_stream *stream, *stmp;
2814
c7260a81 2815 rcu_read_lock();
a0cbdd2e
MD
2816 chan = consumer_find_channel(key);
2817 if (!chan) {
c7260a81 2818 rcu_read_unlock();
a0cbdd2e
MD
2819 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
2820 break;
2821 }
2822 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 2823 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e
MD
2824 ret = lttng_ht_del(channel_ht, &iter);
2825 assert(ret == 0);
2826 consumer_close_channel_streams(chan);
2827
f2a444f1
DG
2828 switch (consumer_data.type) {
2829 case LTTNG_CONSUMER_KERNEL:
2830 break;
2831 case LTTNG_CONSUMER32_UST:
2832 case LTTNG_CONSUMER64_UST:
2833 /* Delete streams that might have been left in the stream list. */
2834 cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
2835 send_node) {
2836 cds_list_del(&stream->send_node);
2837 lttng_ustconsumer_del_stream(stream);
2838 uatomic_sub(&stream->chan->refcount, 1);
2839 assert(&chan->refcount);
2840 free(stream);
2841 }
2842 break;
2843 default:
2844 ERR("Unknown consumer_data type");
2845 assert(0);
2846 }
2847
a0cbdd2e
MD
2848 /*
2849 * Release our own refcount. Force channel deletion even if
2850 * streams were not initialized.
2851 */
2852 if (!uatomic_sub_return(&chan->refcount, 1)) {
2853 consumer_del_channel(chan);
2854 }
c7260a81 2855 rcu_read_unlock();
a0cbdd2e
MD
2856 goto restart;
2857 }
d8ef542d
MD
2858 case CONSUMER_CHANNEL_QUIT:
2859 /*
2860 * Remove the pipe from the poll set and continue the loop
2861 * since their might be data to consume.
2862 */
2863 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2864 continue;
2865 default:
2866 ERR("Unknown action");
2867 break;
2868 }
2869 }
2870
2871 /* Handle other stream */
2872 continue;
2873 }
2874
2875 rcu_read_lock();
2876 {
2877 uint64_t tmp_id = (uint64_t) pollfd;
2878
2879 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
2880 }
2881 node = lttng_ht_iter_get_node_u64(&iter);
2882 assert(node);
2883
2884 chan = caa_container_of(node, struct lttng_consumer_channel,
2885 wait_fd_node);
2886
2887 /* Check for error event */
2888 if (revents & (LPOLLERR | LPOLLHUP)) {
2889 DBG("Channel fd %d is hup|err.", pollfd);
2890
2891 lttng_poll_del(&events, chan->wait_fd);
2892 ret = lttng_ht_del(channel_ht, &iter);
2893 assert(ret == 0);
2894 consumer_close_channel_streams(chan);
f2ad556d
MD
2895
2896 /* Release our own refcount */
2897 if (!uatomic_sub_return(&chan->refcount, 1)
2898 && !uatomic_read(&chan->nb_init_stream_left)) {
2899 consumer_del_channel(chan);
2900 }
d8ef542d
MD
2901 }
2902
2903 /* Release RCU lock for the channel looked up */
2904 rcu_read_unlock();
2905 }
2906 }
2907
1fc79fb4
MD
2908 /* All is OK */
2909 err = 0;
d8ef542d
MD
2910end:
2911 lttng_poll_clean(&events);
2912end_poll:
2913 destroy_channel_ht(channel_ht);
2914end_ht:
2915 DBG("Channel poll thread exiting");
1fc79fb4
MD
2916 if (err) {
2917 health_error();
2918 ERR("Health error occurred in %s", __func__);
2919 }
2920 health_unregister(health_consumerd);
d8ef542d
MD
2921 rcu_unregister_thread();
2922 return NULL;
2923}
2924
331744e3
JD
2925static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
2926 struct pollfd *sockpoll, int client_socket)
2927{
2928 int ret;
2929
2930 assert(ctx);
2931 assert(sockpoll);
2932
2933 if (lttng_consumer_poll_socket(sockpoll) < 0) {
2934 ret = -1;
2935 goto error;
2936 }
2937 DBG("Metadata connection on client_socket");
2938
2939 /* Blocking call, waiting for transmission */
2940 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
2941 if (ctx->consumer_metadata_socket < 0) {
2942 WARN("On accept metadata");
2943 ret = -1;
2944 goto error;
2945 }
2946 ret = 0;
2947
2948error:
2949 return ret;
2950}
2951
3bd1e081
MD
2952/*
2953 * This thread listens on the consumerd socket and receives the file
2954 * descriptors from the session daemon.
2955 */
7d980def 2956void *consumer_thread_sessiond_poll(void *data)
3bd1e081 2957{
1fc79fb4 2958 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
2959 /*
2960 * structure to poll for incoming data on communication socket avoids
2961 * making blocking sockets.
2962 */
2963 struct pollfd consumer_sockpoll[2];
2964 struct lttng_consumer_local_data *ctx = data;
2965
e7b994a3
DG
2966 rcu_register_thread();
2967
1fc79fb4
MD
2968 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
2969
3bd1e081
MD
2970 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2971 unlink(ctx->consumer_command_sock_path);
2972 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2973 if (client_socket < 0) {
2974 ERR("Cannot create command socket");
2975 goto end;
2976 }
2977
2978 ret = lttcomm_listen_unix_sock(client_socket);
2979 if (ret < 0) {
2980 goto end;
2981 }
2982
32258573 2983 DBG("Sending ready command to lttng-sessiond");
f73fabfd 2984 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
2985 /* return < 0 on error, but == 0 is not fatal */
2986 if (ret < 0) {
32258573 2987 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
2988 goto end;
2989 }
2990
3bd1e081
MD
2991 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2992 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2993 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2994 consumer_sockpoll[1].fd = client_socket;
2995 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2996
2997 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2998 goto end;
2999 }
3000 DBG("Connection on client_socket");
3001
3002 /* Blocking call, waiting for transmission */
3003 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3004 if (sock < 0) {
3bd1e081
MD
3005 WARN("On accept");
3006 goto end;
3007 }
3bd1e081 3008
331744e3
JD
3009 /*
3010 * Setup metadata socket which is the second socket connection on the
3011 * command unix socket.
3012 */
3013 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3014 if (ret < 0) {
3015 goto end;
3016 }
3017
d96f09c6
DG
3018 /* This socket is not useful anymore. */
3019 ret = close(client_socket);
3020 if (ret < 0) {
3021 PERROR("close client_socket");
3022 }
3023 client_socket = -1;
3024
3bd1e081
MD
3025 /* update the polling structure to poll on the established socket */
3026 consumer_sockpoll[1].fd = sock;
3027 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3028
3029 while (1) {
3030 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3031 goto end;
3032 }
3033 DBG("Incoming command on sock");
3034 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3035 if (ret == -ENOENT) {
3036 DBG("Received STOP command");
3037 goto end;
3038 }
4cbc1a04
DG
3039 if (ret <= 0) {
3040 /*
3041 * This could simply be a session daemon quitting. Don't output
3042 * ERR() here.
3043 */
3044 DBG("Communication interrupted on command socket");
41ba6035 3045 err = 0;
3bd1e081
MD
3046 goto end;
3047 }
3048 if (consumer_quit) {
3049 DBG("consumer_thread_receive_fds received quit from signal");
1fc79fb4 3050 err = 0; /* All is OK */
3bd1e081
MD
3051 goto end;
3052 }
ffe60014 3053 DBG("received command on sock");
3bd1e081 3054 }
1fc79fb4
MD
3055 /* All is OK */
3056 err = 0;
3057
3bd1e081 3058end:
ffe60014 3059 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3060
d88aee68
DG
3061 /*
3062 * Close metadata streams since the producer is the session daemon which
3063 * just died.
3064 *
3065 * NOTE: for now, this only applies to the UST tracer.
3066 */
3067 lttng_consumer_close_metadata();
3068
3bd1e081
MD
3069 /*
3070 * when all fds have hung up, the polling thread
3071 * can exit cleanly
3072 */
3073 consumer_quit = 1;
3074
04fdd819 3075 /*
c869f647 3076 * Notify the data poll thread to poll back again and test the
8994307f 3077 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3078 */
acdb9057 3079 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3080
a0cbdd2e 3081 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3082
d96f09c6
DG
3083 /* Cleaning up possibly open sockets. */
3084 if (sock >= 0) {
3085 ret = close(sock);
3086 if (ret < 0) {
3087 PERROR("close sock sessiond poll");
3088 }
3089 }
3090 if (client_socket >= 0) {
38476d24 3091 ret = close(client_socket);
d96f09c6
DG
3092 if (ret < 0) {
3093 PERROR("close client_socket sessiond poll");
3094 }
3095 }
3096
1fc79fb4
MD
3097 if (err) {
3098 health_error();
3099 ERR("Health error occurred in %s", __func__);
3100 }
3101 health_unregister(health_consumerd);
3102
e7b994a3 3103 rcu_unregister_thread();
3bd1e081
MD
3104 return NULL;
3105}
d41f73b7 3106
4078b776 3107ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
3108 struct lttng_consumer_local_data *ctx)
3109{
74251bb8
DG
3110 ssize_t ret;
3111
3112 pthread_mutex_lock(&stream->lock);
94d49140
JD
3113 if (stream->metadata_flag) {
3114 pthread_mutex_lock(&stream->metadata_rdv_lock);
3115 }
74251bb8 3116
d41f73b7
MD
3117 switch (consumer_data.type) {
3118 case LTTNG_CONSUMER_KERNEL:
74251bb8
DG
3119 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
3120 break;
7753dea8
MD
3121 case LTTNG_CONSUMER32_UST:
3122 case LTTNG_CONSUMER64_UST:
74251bb8
DG
3123 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
3124 break;
d41f73b7
MD
3125 default:
3126 ERR("Unknown consumer_data type");
3127 assert(0);
74251bb8
DG
3128 ret = -ENOSYS;
3129 break;
d41f73b7 3130 }
74251bb8 3131
94d49140
JD
3132 if (stream->metadata_flag) {
3133 pthread_cond_broadcast(&stream->metadata_rdv);
3134 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3135 }
74251bb8
DG
3136 pthread_mutex_unlock(&stream->lock);
3137 return ret;
d41f73b7
MD
3138}
3139
3140int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3141{
3142 switch (consumer_data.type) {
3143 case LTTNG_CONSUMER_KERNEL:
3144 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3145 case LTTNG_CONSUMER32_UST:
3146 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3147 return lttng_ustconsumer_on_recv_stream(stream);
3148 default:
3149 ERR("Unknown consumer_data type");
3150 assert(0);
3151 return -ENOSYS;
3152 }
3153}
e4421fec
DG
3154
3155/*
3156 * Allocate and set consumer data hash tables.
3157 */
3158void lttng_consumer_init(void)
3159{
d88aee68
DG
3160 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3161 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3162 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
d8ef542d 3163 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
e4421fec 3164}
7735ef9e
DG
3165
3166/*
3167 * Process the ADD_RELAYD command receive by a consumer.
3168 *
3169 * This will create a relayd socket pair and add it to the relayd hash table.
3170 * The caller MUST acquire a RCU read side lock before calling it.
3171 */
da009f2c 3172int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
7735ef9e 3173 struct lttng_consumer_local_data *ctx, int sock,
6151a90f 3174 struct pollfd *consumer_sockpoll,
d3e2ba59
JD
3175 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3176 uint64_t relayd_session_id)
7735ef9e 3177{
cd2b09ed 3178 int fd = -1, ret = -1, relayd_created = 0;
f50f23d9 3179 enum lttng_error_code ret_code = LTTNG_OK;
d4298c99 3180 struct consumer_relayd_sock_pair *relayd = NULL;
7735ef9e 3181
6151a90f
JD
3182 assert(ctx);
3183 assert(relayd_sock);
3184
da009f2c 3185 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3186
3187 /* Get relayd reference if exists. */
3188 relayd = consumer_find_relayd(net_seq_idx);
3189 if (relayd == NULL) {
da009f2c 3190 assert(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3191 /* Not found. Allocate one. */
3192 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3193 if (relayd == NULL) {
0d08d75e 3194 ret = -ENOMEM;
618a6a28
MD
3195 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3196 goto error;
0d08d75e 3197 } else {
30319bcb 3198 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3199 relayd_created = 1;
7735ef9e 3200 }
0d08d75e
DG
3201
3202 /*
3203 * This code path MUST continue to the consumer send status message to
3204 * we can notify the session daemon and continue our work without
3205 * killing everything.
3206 */
da009f2c
MD
3207 } else {
3208 /*
3209 * relayd key should never be found for control socket.
3210 */
3211 assert(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3212 }
3213
3214 /* First send a status message before receiving the fds. */
618a6a28
MD
3215 ret = consumer_send_status_msg(sock, LTTNG_OK);
3216 if (ret < 0) {
0d08d75e 3217 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3218 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3219 goto error_nosignal;
7735ef9e
DG
3220 }
3221
3222 /* Poll on consumer socket. */
3223 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
0d08d75e 3224 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
7735ef9e 3225 ret = -EINTR;
618a6a28 3226 goto error_nosignal;
7735ef9e
DG
3227 }
3228
3229 /* Get relayd socket from session daemon */
3230 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3231 if (ret != sizeof(fd)) {
7735ef9e 3232 ret = -1;
4028eeb9 3233 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3234
3235 /*
3236 * Failing to receive FDs might indicate a major problem such as
3237 * reaching a fd limit during the receive where the kernel returns a
3238 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3239 * don't take any chances and stop everything.
3240 *
3241 * XXX: Feature request #558 will fix that and avoid this possible
3242 * issue when reaching the fd limit.
3243 */
3244 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3245 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3246 goto error;
3247 }
3248
7735ef9e
DG
3249 /* Copy socket information and received FD */
3250 switch (sock_type) {
3251 case LTTNG_STREAM_CONTROL:
3252 /* Copy received lttcomm socket */
6151a90f
JD
3253 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3254 ret = lttcomm_create_sock(&relayd->control_sock.sock);
4028eeb9 3255 /* Handle create_sock error. */
f66c074c 3256 if (ret < 0) {
618a6a28 3257 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3258 goto error;
f66c074c 3259 }
da009f2c
MD
3260 /*
3261 * Close the socket created internally by
3262 * lttcomm_create_sock, so we can replace it by the one
3263 * received from sessiond.
3264 */
3265 if (close(relayd->control_sock.sock.fd)) {
3266 PERROR("close");
3267 }
7735ef9e
DG
3268
3269 /* Assign new file descriptor */
6151a90f 3270 relayd->control_sock.sock.fd = fd;
4b29f1ce 3271 fd = -1; /* For error path */
6151a90f
JD
3272 /* Assign version values. */
3273 relayd->control_sock.major = relayd_sock->major;
3274 relayd->control_sock.minor = relayd_sock->minor;
c5b6f4f0 3275
d3e2ba59 3276 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3277
7735ef9e
DG
3278 break;
3279 case LTTNG_STREAM_DATA:
3280 /* Copy received lttcomm socket */
6151a90f
JD
3281 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3282 ret = lttcomm_create_sock(&relayd->data_sock.sock);
4028eeb9 3283 /* Handle create_sock error. */
f66c074c 3284 if (ret < 0) {
618a6a28 3285 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3286 goto error;
f66c074c 3287 }
da009f2c
MD
3288 /*
3289 * Close the socket created internally by
3290 * lttcomm_create_sock, so we can replace it by the one
3291 * received from sessiond.
3292 */
3293 if (close(relayd->data_sock.sock.fd)) {
3294 PERROR("close");
3295 }
7735ef9e
DG
3296
3297 /* Assign new file descriptor */
6151a90f 3298 relayd->data_sock.sock.fd = fd;
4b29f1ce 3299 fd = -1; /* for eventual error paths */
6151a90f
JD
3300 /* Assign version values. */
3301 relayd->data_sock.major = relayd_sock->major;
3302 relayd->data_sock.minor = relayd_sock->minor;
7735ef9e
DG
3303 break;
3304 default:
3305 ERR("Unknown relayd socket type (%d)", sock_type);
59e71485 3306 ret = -1;
618a6a28 3307 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3308 goto error;
3309 }
3310
d88aee68 3311 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
7735ef9e
DG
3312 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3313 relayd->net_seq_idx, fd);
3314
618a6a28
MD
3315 /* We successfully added the socket. Send status back. */
3316 ret = consumer_send_status_msg(sock, ret_code);
3317 if (ret < 0) {
3318 /* Somehow, the session daemon is not responding anymore. */
3319 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3320 goto error_nosignal;
3321 }
3322
7735ef9e
DG
3323 /*
3324 * Add relayd socket pair to consumer data hashtable. If object already
3325 * exists or on error, the function gracefully returns.
3326 */
d09e1200 3327 add_relayd(relayd);
7735ef9e
DG
3328
3329 /* All good! */
4028eeb9 3330 return 0;
7735ef9e
DG
3331
3332error:
618a6a28
MD
3333 if (consumer_send_status_msg(sock, ret_code) < 0) {
3334 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3335 }
3336
3337error_nosignal:
4028eeb9
DG
3338 /* Close received socket if valid. */
3339 if (fd >= 0) {
3340 if (close(fd)) {
3341 PERROR("close received socket");
3342 }
3343 }
cd2b09ed
DG
3344
3345 if (relayd_created) {
cd2b09ed
DG
3346 free(relayd);
3347 }
3348
7735ef9e
DG
3349 return ret;
3350}
ca22feea 3351
4e9a4686
DG
3352/*
3353 * Try to lock the stream mutex.
3354 *
3355 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3356 */
3357static int stream_try_lock(struct lttng_consumer_stream *stream)
3358{
3359 int ret;
3360
3361 assert(stream);
3362
3363 /*
3364 * Try to lock the stream mutex. On failure, we know that the stream is
3365 * being used else where hence there is data still being extracted.
3366 */
3367 ret = pthread_mutex_trylock(&stream->lock);
3368 if (ret) {
3369 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3370 ret = 0;
3371 goto end;
3372 }
3373
3374 ret = 1;
3375
3376end:
3377 return ret;
3378}
3379
f7079f67
DG
3380/*
3381 * Search for a relayd associated to the session id and return the reference.
3382 *
3383 * A rcu read side lock MUST be acquire before calling this function and locked
3384 * until the relayd object is no longer necessary.
3385 */
3386static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3387{
3388 struct lttng_ht_iter iter;
f7079f67 3389 struct consumer_relayd_sock_pair *relayd = NULL;
f7079f67
DG
3390
3391 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3392 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3393 node.node) {
18261bd1
DG
3394 /*
3395 * Check by sessiond id which is unique here where the relayd session
3396 * id might not be when having multiple relayd.
3397 */
3398 if (relayd->sessiond_session_id == id) {
f7079f67 3399 /* Found the relayd. There can be only one per id. */
18261bd1 3400 goto found;
f7079f67
DG
3401 }
3402 }
3403
18261bd1
DG
3404 return NULL;
3405
3406found:
f7079f67
DG
3407 return relayd;
3408}
3409
ca22feea
DG
3410/*
3411 * Check if for a given session id there is still data needed to be extract
3412 * from the buffers.
3413 *
6d805429 3414 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3415 */
6d805429 3416int consumer_data_pending(uint64_t id)
ca22feea
DG
3417{
3418 int ret;
3419 struct lttng_ht_iter iter;
3420 struct lttng_ht *ht;
3421 struct lttng_consumer_stream *stream;
f7079f67 3422 struct consumer_relayd_sock_pair *relayd = NULL;
6d805429 3423 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3424
6d805429 3425 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3426
6f6eda74 3427 rcu_read_lock();
ca22feea
DG
3428 pthread_mutex_lock(&consumer_data.lock);
3429
3430 switch (consumer_data.type) {
3431 case LTTNG_CONSUMER_KERNEL:
6d805429 3432 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3433 break;
3434 case LTTNG_CONSUMER32_UST:
3435 case LTTNG_CONSUMER64_UST:
6d805429 3436 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3437 break;
3438 default:
3439 ERR("Unknown consumer data type");
3440 assert(0);
3441 }
3442
3443 /* Ease our life a bit */
3444 ht = consumer_data.stream_list_ht;
3445
f7079f67
DG
3446 relayd = find_relayd_by_session_id(id);
3447 if (relayd) {
3448 /* Send init command for data pending. */
3449 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3450 ret = relayd_begin_data_pending(&relayd->control_sock,
3451 relayd->relayd_session_id);
3452 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3453 if (ret < 0) {
3454 /* Communication error thus the relayd so no data pending. */
3455 goto data_not_pending;
3456 }
3457 }
3458
c8f59ee5 3459 cds_lfht_for_each_entry_duplicate(ht->ht,
d88aee68
DG
3460 ht->hash_fct(&id, lttng_ht_seed),
3461 ht->match_fct, &id,
ca22feea 3462 &iter.iter, stream, node_session_id.node) {
4e9a4686
DG
3463 /* If this call fails, the stream is being used hence data pending. */
3464 ret = stream_try_lock(stream);
3465 if (!ret) {
f7079f67 3466 goto data_pending;
ca22feea 3467 }
ca22feea 3468
4e9a4686
DG
3469 /*
3470 * A removed node from the hash table indicates that the stream has
3471 * been deleted thus having a guarantee that the buffers are closed
3472 * on the consumer side. However, data can still be transmitted
3473 * over the network so don't skip the relayd check.
3474 */
3475 ret = cds_lfht_is_node_deleted(&stream->node.node);
3476 if (!ret) {
e5d1a9b3
MD
3477 /*
3478 * An empty output file is not valid. We need at least one packet
3479 * generated per stream, even if it contains no event, so it
3480 * contains at least one packet header.
3481 */
3482 if (stream->output_written == 0) {
3483 pthread_mutex_unlock(&stream->lock);
3484 goto data_pending;
3485 }
4e9a4686 3486 /* Check the stream if there is data in the buffers. */
6d805429
DG
3487 ret = data_pending(stream);
3488 if (ret == 1) {
4e9a4686 3489 pthread_mutex_unlock(&stream->lock);
f7079f67 3490 goto data_pending;
4e9a4686
DG
3491 }
3492 }
3493
3494 /* Relayd check */
f7079f67 3495 if (relayd) {
c8f59ee5
DG
3496 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3497 if (stream->metadata_flag) {
ad7051c0
DG
3498 ret = relayd_quiescent_control(&relayd->control_sock,
3499 stream->relayd_stream_id);
c8f59ee5 3500 } else {
6d805429 3501 ret = relayd_data_pending(&relayd->control_sock,
39df6d9f
DG
3502 stream->relayd_stream_id,
3503 stream->next_net_seq_num - 1);
c8f59ee5
DG
3504 }
3505 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
6d805429 3506 if (ret == 1) {
4e9a4686 3507 pthread_mutex_unlock(&stream->lock);
f7079f67 3508 goto data_pending;
c8f59ee5
DG
3509 }
3510 }
4e9a4686 3511 pthread_mutex_unlock(&stream->lock);
c8f59ee5 3512 }
ca22feea 3513
f7079f67
DG
3514 if (relayd) {
3515 unsigned int is_data_inflight = 0;
3516
3517 /* Send init command for data pending. */
3518 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3519 ret = relayd_end_data_pending(&relayd->control_sock,
3520 relayd->relayd_session_id, &is_data_inflight);
3521 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3522 if (ret < 0) {
f7079f67
DG
3523 goto data_not_pending;
3524 }
bdd88757
DG
3525 if (is_data_inflight) {
3526 goto data_pending;
3527 }
f7079f67
DG
3528 }
3529
ca22feea 3530 /*
f7079f67
DG
3531 * Finding _no_ node in the hash table and no inflight data means that the
3532 * stream(s) have been removed thus data is guaranteed to be available for
3533 * analysis from the trace files.
ca22feea
DG
3534 */
3535
f7079f67 3536data_not_pending:
ca22feea
DG
3537 /* Data is available to be read by a viewer. */
3538 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3539 rcu_read_unlock();
6d805429 3540 return 0;
ca22feea 3541
f7079f67 3542data_pending:
ca22feea
DG
3543 /* Data is still being extracted from buffers. */
3544 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3545 rcu_read_unlock();
6d805429 3546 return 1;
ca22feea 3547}
f50f23d9
DG
3548
3549/*
3550 * Send a ret code status message to the sessiond daemon.
3551 *
3552 * Return the sendmsg() return value.
3553 */
3554int consumer_send_status_msg(int sock, int ret_code)
3555{
3556 struct lttcomm_consumer_status_msg msg;
3557
3558 msg.ret_code = ret_code;
3559
3560 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3561}
ffe60014
DG
3562
3563/*
3564 * Send a channel status message to the sessiond daemon.
3565 *
3566 * Return the sendmsg() return value.
3567 */
3568int consumer_send_status_channel(int sock,
3569 struct lttng_consumer_channel *channel)
3570{
3571 struct lttcomm_consumer_status_channel msg;
3572
3573 assert(sock >= 0);
3574
3575 if (!channel) {
3576 msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
3577 } else {
3578 msg.ret_code = LTTNG_OK;
3579 msg.key = channel->key;
3580 msg.stream_count = channel->streams.count;
3581 }
3582
3583 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3584}
5c786ded
JD
3585
3586/*
3587 * Using a maximum stream size with the produced and consumed position of a
3588 * stream, computes the new consumed position to be as close as possible to the
3589 * maximum possible stream size.
3590 *
3591 * If maximum stream size is lower than the possible buffer size (produced -
3592 * consumed), the consumed_pos given is returned untouched else the new value
3593 * is returned.
3594 */
3595unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
3596 unsigned long produced_pos, uint64_t max_stream_size)
3597{
3598 if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
3599 /* Offset from the produced position to get the latest buffers. */
3600 return produced_pos - max_stream_size;
3601 }
3602
3603 return consumed_pos;
3604}
This page took 0.2398 seconds and 4 git commands to generate.