Move health comm to health-internal.h
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
331744e3 31#include <signal.h>
3bd1e081 32
990570ed 33#include <common/common.h>
fb3a43a9
DG
34#include <common/utils.h>
35#include <common/compat/poll.h>
309167d2 36#include <common/index/index.h>
10a8a223 37#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 38#include <common/sessiond-comm/relayd.h>
10a8a223
DG
39#include <common/sessiond-comm/sessiond-comm.h>
40#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 41#include <common/relayd/relayd.h>
10a8a223 42#include <common/ust-consumer/ust-consumer.h>
d3e2ba59 43#include <common/consumer-timer.h>
10a8a223
DG
44
45#include "consumer.h"
1d1a276c 46#include "consumer-stream.h"
1fc79fb4 47#include "../bin/lttng-consumerd/health-consumerd.h"
3bd1e081
MD
48
49struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
50 .stream_count = 0,
51 .need_update = 1,
52 .type = LTTNG_CONSUMER_UNKNOWN,
53};
54
d8ef542d
MD
55enum consumer_channel_action {
56 CONSUMER_CHANNEL_ADD,
a0cbdd2e 57 CONSUMER_CHANNEL_DEL,
d8ef542d
MD
58 CONSUMER_CHANNEL_QUIT,
59};
60
61struct consumer_channel_msg {
62 enum consumer_channel_action action;
a0cbdd2e
MD
63 struct lttng_consumer_channel *chan; /* add */
64 uint64_t key; /* del */
d8ef542d
MD
65};
66
3bd1e081
MD
67/*
68 * Flag to inform the polling thread to quit when all fd hung up. Updated by
69 * the consumer_thread_receive_fds when it notices that all fds has hung up.
70 * Also updated by the signal handler (consumer_should_exit()). Read by the
71 * polling threads.
72 */
a98dae5f 73volatile int consumer_quit;
3bd1e081 74
43c34bc3 75/*
43c34bc3
DG
76 * Global hash table containing respectively metadata and data streams. The
77 * stream element in this ht should only be updated by the metadata poll thread
78 * for the metadata and the data poll thread for the data.
79 */
40dc48e0
DG
80static struct lttng_ht *metadata_ht;
81static struct lttng_ht *data_ht;
43c34bc3 82
acdb9057
DG
83/*
84 * Notify a thread lttng pipe to poll back again. This usually means that some
85 * global state has changed so we just send back the thread in a poll wait
86 * call.
87 */
88static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
89{
90 struct lttng_consumer_stream *null_stream = NULL;
91
92 assert(pipe);
93
94 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
95}
96
d8ef542d
MD
97static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
98 struct lttng_consumer_channel *chan,
a0cbdd2e 99 uint64_t key,
d8ef542d
MD
100 enum consumer_channel_action action)
101{
102 struct consumer_channel_msg msg;
103 int ret;
104
e56251fc
DG
105 memset(&msg, 0, sizeof(msg));
106
d8ef542d
MD
107 msg.action = action;
108 msg.chan = chan;
f21dae48 109 msg.key = key;
d8ef542d
MD
110 do {
111 ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
112 } while (ret < 0 && errno == EINTR);
113}
114
a0cbdd2e
MD
115void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
116 uint64_t key)
117{
118 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
119}
120
d8ef542d
MD
121static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
122 struct lttng_consumer_channel **chan,
a0cbdd2e 123 uint64_t *key,
d8ef542d
MD
124 enum consumer_channel_action *action)
125{
126 struct consumer_channel_msg msg;
127 int ret;
128
129 do {
130 ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
131 } while (ret < 0 && errno == EINTR);
132 if (ret > 0) {
133 *action = msg.action;
134 *chan = msg.chan;
a0cbdd2e 135 *key = msg.key;
d8ef542d
MD
136 }
137 return ret;
138}
139
3bd1e081
MD
140/*
141 * Find a stream. The consumer_data.lock must be locked during this
142 * call.
143 */
d88aee68 144static struct lttng_consumer_stream *find_stream(uint64_t key,
8389e4f8 145 struct lttng_ht *ht)
3bd1e081 146{
e4421fec 147 struct lttng_ht_iter iter;
d88aee68 148 struct lttng_ht_node_u64 *node;
e4421fec 149 struct lttng_consumer_stream *stream = NULL;
3bd1e081 150
8389e4f8
DG
151 assert(ht);
152
d88aee68
DG
153 /* -1ULL keys are lookup failures */
154 if (key == (uint64_t) -1ULL) {
7ad0a0cb 155 return NULL;
7a57cf92 156 }
e4421fec 157
6065ceec
DG
158 rcu_read_lock();
159
d88aee68
DG
160 lttng_ht_lookup(ht, &key, &iter);
161 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
162 if (node != NULL) {
163 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 164 }
e4421fec 165
6065ceec
DG
166 rcu_read_unlock();
167
e4421fec 168 return stream;
3bd1e081
MD
169}
170
da009f2c 171static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
7ad0a0cb
MD
172{
173 struct lttng_consumer_stream *stream;
174
04253271 175 rcu_read_lock();
ffe60014 176 stream = find_stream(key, ht);
04253271 177 if (stream) {
da009f2c 178 stream->key = (uint64_t) -1ULL;
04253271
MD
179 /*
180 * We don't want the lookup to match, but we still need
181 * to iterate on this stream when iterating over the hash table. Just
182 * change the node key.
183 */
da009f2c 184 stream->node.key = (uint64_t) -1ULL;
04253271
MD
185 }
186 rcu_read_unlock();
7ad0a0cb
MD
187}
188
d56db448
DG
189/*
190 * Return a channel object for the given key.
191 *
192 * RCU read side lock MUST be acquired before calling this function and
193 * protects the channel ptr.
194 */
d88aee68 195struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
3bd1e081 196{
e4421fec 197 struct lttng_ht_iter iter;
d88aee68 198 struct lttng_ht_node_u64 *node;
e4421fec 199 struct lttng_consumer_channel *channel = NULL;
3bd1e081 200
d88aee68
DG
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
7ad0a0cb 203 return NULL;
7a57cf92 204 }
e4421fec 205
d88aee68
DG
206 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
207 node = lttng_ht_iter_get_node_u64(&iter);
e4421fec
DG
208 if (node != NULL) {
209 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 210 }
e4421fec
DG
211
212 return channel;
3bd1e081
MD
213}
214
ffe60014 215static void free_stream_rcu(struct rcu_head *head)
7ad0a0cb 216{
d88aee68
DG
217 struct lttng_ht_node_u64 *node =
218 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
219 struct lttng_consumer_stream *stream =
220 caa_container_of(node, struct lttng_consumer_stream, node);
7ad0a0cb 221
ffe60014 222 free(stream);
7ad0a0cb
MD
223}
224
ffe60014 225static void free_channel_rcu(struct rcu_head *head)
702b1ea4 226{
d88aee68
DG
227 struct lttng_ht_node_u64 *node =
228 caa_container_of(head, struct lttng_ht_node_u64, head);
ffe60014
DG
229 struct lttng_consumer_channel *channel =
230 caa_container_of(node, struct lttng_consumer_channel, node);
702b1ea4 231
ffe60014 232 free(channel);
702b1ea4
MD
233}
234
00e2e675
DG
235/*
236 * RCU protected relayd socket pair free.
237 */
ffe60014 238static void free_relayd_rcu(struct rcu_head *head)
00e2e675 239{
d88aee68
DG
240 struct lttng_ht_node_u64 *node =
241 caa_container_of(head, struct lttng_ht_node_u64, head);
00e2e675
DG
242 struct consumer_relayd_sock_pair *relayd =
243 caa_container_of(node, struct consumer_relayd_sock_pair, node);
244
8994307f
DG
245 /*
246 * Close all sockets. This is done in the call RCU since we don't want the
247 * socket fds to be reassigned thus potentially creating bad state of the
248 * relayd object.
249 *
250 * We do not have to lock the control socket mutex here since at this stage
251 * there is no one referencing to this relayd object.
252 */
253 (void) relayd_close(&relayd->control_sock);
254 (void) relayd_close(&relayd->data_sock);
255
00e2e675
DG
256 free(relayd);
257}
258
259/*
260 * Destroy and free relayd socket pair object.
00e2e675 261 */
51230d70 262void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
263{
264 int ret;
265 struct lttng_ht_iter iter;
266
173af62f
DG
267 if (relayd == NULL) {
268 return;
269 }
270
00e2e675
DG
271 DBG("Consumer destroy and close relayd socket pair");
272
273 iter.iter.node = &relayd->node.node;
274 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f 275 if (ret != 0) {
8994307f 276 /* We assume the relayd is being or is destroyed */
173af62f
DG
277 return;
278 }
00e2e675 279
00e2e675 280 /* RCU free() call */
ffe60014
DG
281 call_rcu(&relayd->node.head, free_relayd_rcu);
282}
283
284/*
285 * Remove a channel from the global list protected by a mutex. This function is
286 * also responsible for freeing its data structures.
287 */
288void consumer_del_channel(struct lttng_consumer_channel *channel)
289{
290 int ret;
291 struct lttng_ht_iter iter;
f2a444f1 292 struct lttng_consumer_stream *stream, *stmp;
ffe60014 293
d88aee68 294 DBG("Consumer delete channel key %" PRIu64, channel->key);
ffe60014
DG
295
296 pthread_mutex_lock(&consumer_data.lock);
a9838785 297 pthread_mutex_lock(&channel->lock);
ffe60014 298
51e762e5
JD
299 /* Delete streams that might have been left in the stream list. */
300 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
301 send_node) {
302 cds_list_del(&stream->send_node);
303 /*
304 * Once a stream is added to this list, the buffers were created so
305 * we have a guarantee that this call will succeed.
306 */
307 consumer_stream_destroy(stream, NULL);
308 }
309
d3e2ba59
JD
310 if (channel->live_timer_enabled == 1) {
311 consumer_timer_live_stop(channel);
312 }
313
ffe60014
DG
314 switch (consumer_data.type) {
315 case LTTNG_CONSUMER_KERNEL:
316 break;
317 case LTTNG_CONSUMER32_UST:
318 case LTTNG_CONSUMER64_UST:
319 lttng_ustconsumer_del_channel(channel);
320 break;
321 default:
322 ERR("Unknown consumer_data type");
323 assert(0);
324 goto end;
325 }
326
327 rcu_read_lock();
328 iter.iter.node = &channel->node.node;
329 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
330 assert(!ret);
331 rcu_read_unlock();
332
333 call_rcu(&channel->node.head, free_channel_rcu);
334end:
a9838785 335 pthread_mutex_unlock(&channel->lock);
ffe60014 336 pthread_mutex_unlock(&consumer_data.lock);
00e2e675
DG
337}
338
228b5bf7
DG
339/*
340 * Iterate over the relayd hash table and destroy each element. Finally,
341 * destroy the whole hash table.
342 */
343static void cleanup_relayd_ht(void)
344{
345 struct lttng_ht_iter iter;
346 struct consumer_relayd_sock_pair *relayd;
347
348 rcu_read_lock();
349
350 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
351 node.node) {
51230d70 352 consumer_destroy_relayd(relayd);
228b5bf7
DG
353 }
354
228b5bf7 355 rcu_read_unlock();
36b588ed
MD
356
357 lttng_ht_destroy(consumer_data.relayd_ht);
228b5bf7
DG
358}
359
8994307f
DG
360/*
361 * Update the end point status of all streams having the given network sequence
362 * index (relayd index).
363 *
364 * It's atomically set without having the stream mutex locked which is fine
365 * because we handle the write/read race with a pipe wakeup for each thread.
366 */
da009f2c 367static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
8994307f
DG
368 enum consumer_endpoint_status status)
369{
370 struct lttng_ht_iter iter;
371 struct lttng_consumer_stream *stream;
372
da009f2c 373 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
8994307f
DG
374
375 rcu_read_lock();
376
377 /* Let's begin with metadata */
378 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
379 if (stream->net_seq_idx == net_seq_idx) {
380 uatomic_set(&stream->endpoint_status, status);
381 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
382 }
383 }
384
385 /* Follow up by the data streams */
386 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
387 if (stream->net_seq_idx == net_seq_idx) {
388 uatomic_set(&stream->endpoint_status, status);
389 DBG("Delete flag set to data stream %d", stream->wait_fd);
390 }
391 }
392 rcu_read_unlock();
393}
394
395/*
396 * Cleanup a relayd object by flagging every associated streams for deletion,
397 * destroying the object meaning removing it from the relayd hash table,
398 * closing the sockets and freeing the memory in a RCU call.
399 *
400 * If a local data context is available, notify the threads that the streams'
401 * state have changed.
402 */
403static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
404 struct lttng_consumer_local_data *ctx)
405{
da009f2c 406 uint64_t netidx;
8994307f
DG
407
408 assert(relayd);
409
9617607b
DG
410 DBG("Cleaning up relayd sockets");
411
8994307f
DG
412 /* Save the net sequence index before destroying the object */
413 netidx = relayd->net_seq_idx;
414
415 /*
416 * Delete the relayd from the relayd hash table, close the sockets and free
417 * the object in a RCU call.
418 */
51230d70 419 consumer_destroy_relayd(relayd);
8994307f
DG
420
421 /* Set inactive endpoint to all streams */
422 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
423
424 /*
425 * With a local data context, notify the threads that the streams' state
426 * have changed. The write() action on the pipe acts as an "implicit"
427 * memory barrier ordering the updates of the end point status from the
428 * read of this status which happens AFTER receiving this notify.
429 */
430 if (ctx) {
acdb9057 431 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
13886d2d 432 notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
8994307f
DG
433 }
434}
435
a6ba4fe1
DG
436/*
437 * Flag a relayd socket pair for destruction. Destroy it if the refcount
438 * reaches zero.
439 *
440 * RCU read side lock MUST be aquired before calling this function.
441 */
442void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
443{
444 assert(relayd);
445
446 /* Set destroy flag for this object */
447 uatomic_set(&relayd->destroy_flag, 1);
448
449 /* Destroy the relayd if refcount is 0 */
450 if (uatomic_read(&relayd->refcount) == 0) {
51230d70 451 consumer_destroy_relayd(relayd);
a6ba4fe1
DG
452 }
453}
454
3bd1e081 455/*
1d1a276c
DG
456 * Completly destroy stream from every visiable data structure and the given
457 * hash table if one.
458 *
459 * One this call returns, the stream object is not longer usable nor visible.
3bd1e081 460 */
e316aad5
DG
461void consumer_del_stream(struct lttng_consumer_stream *stream,
462 struct lttng_ht *ht)
3bd1e081 463{
1d1a276c 464 consumer_stream_destroy(stream, ht);
3bd1e081
MD
465}
466
5ab66908
MD
467/*
468 * XXX naming of del vs destroy is all mixed up.
469 */
470void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
471{
472 consumer_stream_destroy(stream, data_ht);
473}
474
475void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
476{
477 consumer_stream_destroy(stream, metadata_ht);
478}
479
d88aee68
DG
480struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
481 uint64_t stream_key,
3bd1e081 482 enum lttng_consumer_stream_state state,
ffe60014 483 const char *channel_name,
6df2e2c9 484 uid_t uid,
00e2e675 485 gid_t gid,
57a269f2 486 uint64_t relayd_id,
53632229 487 uint64_t session_id,
ffe60014
DG
488 int cpu,
489 int *alloc_ret,
4891ece8
DG
490 enum consumer_channel_type type,
491 unsigned int monitor)
3bd1e081 492{
ffe60014 493 int ret;
3bd1e081 494 struct lttng_consumer_stream *stream;
3bd1e081 495
effcf122 496 stream = zmalloc(sizeof(*stream));
3bd1e081 497 if (stream == NULL) {
7a57cf92 498 PERROR("malloc struct lttng_consumer_stream");
ffe60014 499 ret = -ENOMEM;
7a57cf92 500 goto end;
3bd1e081 501 }
7a57cf92 502
d56db448
DG
503 rcu_read_lock();
504
3bd1e081 505 stream->key = stream_key;
3bd1e081
MD
506 stream->out_fd = -1;
507 stream->out_fd_offset = 0;
e5d1a9b3 508 stream->output_written = 0;
3bd1e081 509 stream->state = state;
6df2e2c9
MD
510 stream->uid = uid;
511 stream->gid = gid;
ffe60014 512 stream->net_seq_idx = relayd_id;
53632229 513 stream->session_id = session_id;
4891ece8 514 stream->monitor = monitor;
774d490c 515 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
309167d2 516 stream->index_fd = -1;
53632229 517 pthread_mutex_init(&stream->lock, NULL);
58b1f425 518
ffe60014
DG
519 /* If channel is the metadata, flag this stream as metadata. */
520 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
521 stream->metadata_flag = 1;
522 /* Metadata is flat out. */
523 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
94d49140
JD
524 /* Live rendez-vous point. */
525 pthread_cond_init(&stream->metadata_rdv, NULL);
526 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
58b1f425 527 } else {
ffe60014
DG
528 /* Format stream name to <channel_name>_<cpu_number> */
529 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
530 channel_name, cpu);
531 if (ret < 0) {
532 PERROR("snprintf stream name");
533 goto error;
534 }
58b1f425 535 }
c30aaa51 536
ffe60014 537 /* Key is always the wait_fd for streams. */
d88aee68 538 lttng_ht_node_init_u64(&stream->node, stream->key);
ffe60014 539
d8ef542d
MD
540 /* Init node per channel id key */
541 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
542
53632229 543 /* Init session id node with the stream session id */
d88aee68 544 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
53632229 545
07b86b52
JD
546 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
547 " relayd_id %" PRIu64 ", session_id %" PRIu64,
548 stream->name, stream->key, channel_key,
549 stream->net_seq_idx, stream->session_id);
d56db448
DG
550
551 rcu_read_unlock();
3bd1e081 552 return stream;
c80048c6
MD
553
554error:
d56db448 555 rcu_read_unlock();
c80048c6 556 free(stream);
7a57cf92 557end:
ffe60014
DG
558 if (alloc_ret) {
559 *alloc_ret = ret;
560 }
c80048c6 561 return NULL;
3bd1e081
MD
562}
563
564/*
565 * Add a stream to the global list protected by a mutex.
566 */
5ab66908 567int consumer_add_data_stream(struct lttng_consumer_stream *stream)
3bd1e081 568{
5ab66908 569 struct lttng_ht *ht = data_ht;
3bd1e081
MD
570 int ret = 0;
571
e316aad5 572 assert(stream);
43c34bc3 573 assert(ht);
c77fc10a 574
d88aee68 575 DBG3("Adding consumer stream %" PRIu64, stream->key);
e316aad5
DG
576
577 pthread_mutex_lock(&consumer_data.lock);
a9838785 578 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 579 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 580 pthread_mutex_lock(&stream->lock);
b0b335c8 581 rcu_read_lock();
e316aad5 582
43c34bc3 583 /* Steal stream identifier to avoid having streams with the same key */
ffe60014 584 steal_stream_key(stream->key, ht);
43c34bc3 585
d88aee68 586 lttng_ht_add_unique_u64(ht, &stream->node);
00e2e675 587
d8ef542d
MD
588 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
589 &stream->node_channel_id);
590
ca22feea
DG
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
d88aee68 596 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 597
e316aad5 598 /*
ffe60014
DG
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
e316aad5
DG
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
ffe60014 605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
ffe60014 608 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
609 }
610
611 /* Update consumer data once the node is inserted. */
3bd1e081
MD
612 consumer_data.stream_count++;
613 consumer_data.need_update = 1;
614
e316aad5 615 rcu_read_unlock();
2e818a6a 616 pthread_mutex_unlock(&stream->lock);
ec6ea7d0 617 pthread_mutex_unlock(&stream->chan->timer_lock);
a9838785 618 pthread_mutex_unlock(&stream->chan->lock);
3bd1e081 619 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 620
3bd1e081
MD
621 return ret;
622}
623
5ab66908
MD
624void consumer_del_data_stream(struct lttng_consumer_stream *stream)
625{
626 consumer_del_stream(stream, data_ht);
627}
628
00e2e675 629/*
3f8e211f
DG
630 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
631 * be acquired before calling this.
00e2e675 632 */
d09e1200 633static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
634{
635 int ret = 0;
d88aee68 636 struct lttng_ht_node_u64 *node;
00e2e675
DG
637 struct lttng_ht_iter iter;
638
ffe60014 639 assert(relayd);
00e2e675 640
00e2e675 641 lttng_ht_lookup(consumer_data.relayd_ht,
d88aee68
DG
642 &relayd->net_seq_idx, &iter);
643 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675 644 if (node != NULL) {
00e2e675
DG
645 goto end;
646 }
d88aee68 647 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
00e2e675 648
00e2e675
DG
649end:
650 return ret;
651}
652
653/*
654 * Allocate and return a consumer relayd socket.
655 */
656struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
da009f2c 657 uint64_t net_seq_idx)
00e2e675
DG
658{
659 struct consumer_relayd_sock_pair *obj = NULL;
660
da009f2c
MD
661 /* net sequence index of -1 is a failure */
662 if (net_seq_idx == (uint64_t) -1ULL) {
00e2e675
DG
663 goto error;
664 }
665
666 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
667 if (obj == NULL) {
668 PERROR("zmalloc relayd sock");
669 goto error;
670 }
671
672 obj->net_seq_idx = net_seq_idx;
673 obj->refcount = 0;
173af62f 674 obj->destroy_flag = 0;
f96e4545
MD
675 obj->control_sock.sock.fd = -1;
676 obj->data_sock.sock.fd = -1;
d88aee68 677 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
00e2e675
DG
678 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
679
680error:
681 return obj;
682}
683
684/*
685 * Find a relayd socket pair in the global consumer data.
686 *
687 * Return the object if found else NULL.
b0b335c8
MD
688 * RCU read-side lock must be held across this call and while using the
689 * returned object.
00e2e675 690 */
d88aee68 691struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
00e2e675
DG
692{
693 struct lttng_ht_iter iter;
d88aee68 694 struct lttng_ht_node_u64 *node;
00e2e675
DG
695 struct consumer_relayd_sock_pair *relayd = NULL;
696
697 /* Negative keys are lookup failures */
d88aee68 698 if (key == (uint64_t) -1ULL) {
00e2e675
DG
699 goto error;
700 }
701
d88aee68 702 lttng_ht_lookup(consumer_data.relayd_ht, &key,
00e2e675 703 &iter);
d88aee68 704 node = lttng_ht_iter_get_node_u64(&iter);
00e2e675
DG
705 if (node != NULL) {
706 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
707 }
708
00e2e675
DG
709error:
710 return relayd;
711}
712
10a50311
JD
713/*
714 * Find a relayd and send the stream
715 *
716 * Returns 0 on success, < 0 on error
717 */
718int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
719 char *path)
720{
721 int ret = 0;
722 struct consumer_relayd_sock_pair *relayd;
723
724 assert(stream);
725 assert(stream->net_seq_idx != -1ULL);
726 assert(path);
727
728 /* The stream is not metadata. Get relayd reference if exists. */
729 rcu_read_lock();
730 relayd = consumer_find_relayd(stream->net_seq_idx);
731 if (relayd != NULL) {
732 /* Add stream on the relayd */
733 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
734 ret = relayd_add_stream(&relayd->control_sock, stream->name,
735 path, &stream->relayd_stream_id,
736 stream->chan->tracefile_size, stream->chan->tracefile_count);
737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
738 if (ret < 0) {
739 goto end;
740 }
1c20f0e2 741
10a50311 742 uatomic_inc(&relayd->refcount);
d01178b6 743 stream->sent_to_relayd = 1;
10a50311
JD
744 } else {
745 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
746 stream->key, stream->net_seq_idx);
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
752 stream->name, stream->key, stream->net_seq_idx);
753
754end:
755 rcu_read_unlock();
756 return ret;
757}
758
759/*
760 * Find a relayd and close the stream
761 */
762void close_relayd_stream(struct lttng_consumer_stream *stream)
763{
764 struct consumer_relayd_sock_pair *relayd;
765
766 /* The stream is not metadata. Get relayd reference if exists. */
767 rcu_read_lock();
768 relayd = consumer_find_relayd(stream->net_seq_idx);
769 if (relayd) {
770 consumer_stream_relayd_close(stream, relayd);
771 }
772 rcu_read_unlock();
773}
774
00e2e675
DG
775/*
776 * Handle stream for relayd transmission if the stream applies for network
777 * streaming where the net sequence index is set.
778 *
779 * Return destination file descriptor or negative value on error.
780 */
6197aea7 781static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
1d4dfdef
DG
782 size_t data_size, unsigned long padding,
783 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
784{
785 int outfd = -1, ret;
00e2e675
DG
786 struct lttcomm_relayd_data_hdr data_hdr;
787
788 /* Safety net */
789 assert(stream);
6197aea7 790 assert(relayd);
00e2e675
DG
791
792 /* Reset data header */
793 memset(&data_hdr, 0, sizeof(data_hdr));
794
00e2e675
DG
795 if (stream->metadata_flag) {
796 /* Caller MUST acquire the relayd control socket lock */
797 ret = relayd_send_metadata(&relayd->control_sock, data_size);
798 if (ret < 0) {
799 goto error;
800 }
801
802 /* Metadata are always sent on the control socket. */
6151a90f 803 outfd = relayd->control_sock.sock.fd;
00e2e675
DG
804 } else {
805 /* Set header with stream information */
806 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
807 data_hdr.data_size = htobe32(data_size);
1d4dfdef 808 data_hdr.padding_size = htobe32(padding);
39df6d9f
DG
809 /*
810 * Note that net_seq_num below is assigned with the *current* value of
811 * next_net_seq_num and only after that the next_net_seq_num will be
812 * increment. This is why when issuing a command on the relayd using
813 * this next value, 1 should always be substracted in order to compare
814 * the last seen sequence number on the relayd side to the last sent.
815 */
3604f373 816 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
00e2e675
DG
817 /* Other fields are zeroed previously */
818
819 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
820 sizeof(data_hdr));
821 if (ret < 0) {
822 goto error;
823 }
824
3604f373
DG
825 ++stream->next_net_seq_num;
826
00e2e675 827 /* Set to go on data socket */
6151a90f 828 outfd = relayd->data_sock.sock.fd;
00e2e675
DG
829 }
830
831error:
832 return outfd;
833}
834
3bd1e081 835/*
ffe60014
DG
836 * Allocate and return a new lttng_consumer_channel object using the given key
837 * to initialize the hash table node.
838 *
839 * On error, return NULL.
3bd1e081 840 */
886224ff 841struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
ffe60014
DG
842 uint64_t session_id,
843 const char *pathname,
844 const char *name,
845 uid_t uid,
846 gid_t gid,
57a269f2 847 uint64_t relayd_id,
1624d5b7
JD
848 enum lttng_event_output output,
849 uint64_t tracefile_size,
2bba9e53 850 uint64_t tracefile_count,
1950109e 851 uint64_t session_id_per_pid,
ecc48a90
JD
852 unsigned int monitor,
853 unsigned int live_timer_interval)
3bd1e081
MD
854{
855 struct lttng_consumer_channel *channel;
3bd1e081 856
276b26d1 857 channel = zmalloc(sizeof(*channel));
3bd1e081 858 if (channel == NULL) {
7a57cf92 859 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
860 goto end;
861 }
ffe60014
DG
862
863 channel->key = key;
3bd1e081 864 channel->refcount = 0;
ffe60014 865 channel->session_id = session_id;
1950109e 866 channel->session_id_per_pid = session_id_per_pid;
ffe60014
DG
867 channel->uid = uid;
868 channel->gid = gid;
869 channel->relayd_id = relayd_id;
870 channel->output = output;
1624d5b7
JD
871 channel->tracefile_size = tracefile_size;
872 channel->tracefile_count = tracefile_count;
2bba9e53 873 channel->monitor = monitor;
ecc48a90 874 channel->live_timer_interval = live_timer_interval;
a9838785 875 pthread_mutex_init(&channel->lock, NULL);
ec6ea7d0 876 pthread_mutex_init(&channel->timer_lock, NULL);
ffe60014 877
07b86b52
JD
878 /*
879 * In monitor mode, the streams associated with the channel will be put in
880 * a special list ONLY owned by this channel. So, the refcount is set to 1
881 * here meaning that the channel itself has streams that are referenced.
882 *
883 * On a channel deletion, once the channel is no longer visible, the
884 * refcount is decremented and checked for a zero value to delete it. With
885 * streams in no monitor mode, it will now be safe to destroy the channel.
886 */
887 if (!channel->monitor) {
888 channel->refcount = 1;
889 }
890
ffe60014
DG
891 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
892 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
893
894 strncpy(channel->name, name, sizeof(channel->name));
895 channel->name[sizeof(channel->name) - 1] = '\0';
896
d88aee68 897 lttng_ht_node_init_u64(&channel->node, channel->key);
d8ef542d
MD
898
899 channel->wait_fd = -1;
900
ffe60014
DG
901 CDS_INIT_LIST_HEAD(&channel->streams.head);
902
d88aee68 903 DBG("Allocated channel (key %" PRIu64 ")", channel->key)
3bd1e081 904
3bd1e081
MD
905end:
906 return channel;
907}
908
909/*
910 * Add a channel to the global list protected by a mutex.
821fffb2
DG
911 *
912 * On success 0 is returned else a negative value.
3bd1e081 913 */
d8ef542d
MD
914int consumer_add_channel(struct lttng_consumer_channel *channel,
915 struct lttng_consumer_local_data *ctx)
3bd1e081 916{
ffe60014 917 int ret = 0;
d88aee68 918 struct lttng_ht_node_u64 *node;
c77fc10a
DG
919 struct lttng_ht_iter iter;
920
3bd1e081 921 pthread_mutex_lock(&consumer_data.lock);
a9838785 922 pthread_mutex_lock(&channel->lock);
ec6ea7d0 923 pthread_mutex_lock(&channel->timer_lock);
6065ceec 924 rcu_read_lock();
c77fc10a 925
7972aab2 926 lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
d88aee68 927 node = lttng_ht_iter_get_node_u64(&iter);
c77fc10a
DG
928 if (node != NULL) {
929 /* Channel already exist. Ignore the insertion */
d88aee68
DG
930 ERR("Consumer add channel key %" PRIu64 " already exists!",
931 channel->key);
821fffb2 932 ret = -EEXIST;
c77fc10a
DG
933 goto end;
934 }
935
d88aee68 936 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
937
938end:
6065ceec 939 rcu_read_unlock();
ec6ea7d0 940 pthread_mutex_unlock(&channel->timer_lock);
a9838785 941 pthread_mutex_unlock(&channel->lock);
3bd1e081 942 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 943
d8ef542d 944 if (!ret && channel->wait_fd != -1 &&
10a50311 945 channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
a0cbdd2e 946 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
d8ef542d 947 }
ffe60014 948 return ret;
3bd1e081
MD
949}
950
951/*
952 * Allocate the pollfd structure and the local view of the out fds to avoid
953 * doing a lookup in the linked list and concurrency issues when writing is
954 * needed. Called with consumer_data.lock held.
955 *
956 * Returns the number of fds in the structures.
957 */
ffe60014
DG
958static int update_poll_array(struct lttng_consumer_local_data *ctx,
959 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
960 struct lttng_ht *ht)
3bd1e081 961{
3bd1e081 962 int i = 0;
e4421fec
DG
963 struct lttng_ht_iter iter;
964 struct lttng_consumer_stream *stream;
3bd1e081 965
ffe60014
DG
966 assert(ctx);
967 assert(ht);
968 assert(pollfd);
969 assert(local_stream);
970
3bd1e081 971 DBG("Updating poll fd array");
481d6c57 972 rcu_read_lock();
43c34bc3 973 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
974 /*
975 * Only active streams with an active end point can be added to the
976 * poll set and local stream storage of the thread.
977 *
978 * There is a potential race here for endpoint_status to be updated
979 * just after the check. However, this is OK since the stream(s) will
980 * be deleted once the thread is notified that the end point state has
981 * changed where this function will be called back again.
982 */
983 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
79d4ffb7 984 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
3bd1e081
MD
985 continue;
986 }
7972aab2
DG
987 /*
988 * This clobbers way too much the debug output. Uncomment that if you
989 * need it for debugging purposes.
990 *
991 * DBG("Active FD %d", stream->wait_fd);
992 */
e4421fec 993 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 994 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 995 local_stream[i] = stream;
3bd1e081
MD
996 i++;
997 }
481d6c57 998 rcu_read_unlock();
3bd1e081
MD
999
1000 /*
50f8ae69 1001 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
1002 * increment i so nb_fd is the number of real FD.
1003 */
acdb9057 1004 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
509bb1cf 1005 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
1006 return i;
1007}
1008
1009/*
1010 * Poll on the should_quit pipe and the command socket return -1 on error and
1011 * should exit, 0 if data is available on the command socket
1012 */
1013int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1014{
1015 int num_rdy;
1016
88f2b785 1017restart:
3bd1e081
MD
1018 num_rdy = poll(consumer_sockpoll, 2, -1);
1019 if (num_rdy == -1) {
88f2b785
MD
1020 /*
1021 * Restart interrupted system call.
1022 */
1023 if (errno == EINTR) {
1024 goto restart;
1025 }
7a57cf92 1026 PERROR("Poll error");
3bd1e081
MD
1027 goto exit;
1028 }
509bb1cf 1029 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
1030 DBG("consumer_should_quit wake up");
1031 goto exit;
1032 }
1033 return 0;
1034
1035exit:
1036 return -1;
1037}
1038
1039/*
1040 * Set the error socket.
1041 */
ffe60014
DG
1042void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1043 int sock)
3bd1e081
MD
1044{
1045 ctx->consumer_error_socket = sock;
1046}
1047
1048/*
1049 * Set the command socket path.
1050 */
3bd1e081
MD
1051void lttng_consumer_set_command_sock_path(
1052 struct lttng_consumer_local_data *ctx, char *sock)
1053{
1054 ctx->consumer_command_sock_path = sock;
1055}
1056
1057/*
1058 * Send return code to the session daemon.
1059 * If the socket is not defined, we return 0, it is not a fatal error
1060 */
ffe60014 1061int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
3bd1e081
MD
1062{
1063 if (ctx->consumer_error_socket > 0) {
1064 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1065 sizeof(enum lttcomm_sessiond_command));
1066 }
1067
1068 return 0;
1069}
1070
1071/*
228b5bf7
DG
1072 * Close all the tracefiles and stream fds and MUST be called when all
1073 * instances are destroyed i.e. when all threads were joined and are ended.
3bd1e081
MD
1074 */
1075void lttng_consumer_cleanup(void)
1076{
e4421fec 1077 struct lttng_ht_iter iter;
ffe60014 1078 struct lttng_consumer_channel *channel;
6065ceec
DG
1079
1080 rcu_read_lock();
3bd1e081 1081
ffe60014
DG
1082 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1083 node.node) {
702b1ea4 1084 consumer_del_channel(channel);
3bd1e081 1085 }
6065ceec
DG
1086
1087 rcu_read_unlock();
d6ce1df2 1088
d6ce1df2 1089 lttng_ht_destroy(consumer_data.channel_ht);
228b5bf7
DG
1090
1091 cleanup_relayd_ht();
1092
d8ef542d
MD
1093 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1094
228b5bf7
DG
1095 /*
1096 * This HT contains streams that are freed by either the metadata thread or
1097 * the data thread so we do *nothing* on the hash table and simply destroy
1098 * it.
1099 */
1100 lttng_ht_destroy(consumer_data.stream_list_ht);
3bd1e081
MD
1101}
1102
1103/*
1104 * Called from signal handler.
1105 */
1106void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1107{
1108 int ret;
1109 consumer_quit = 1;
6f94560a
MD
1110 do {
1111 ret = write(ctx->consumer_should_quit[1], "4", 1);
1112 } while (ret < 0 && errno == EINTR);
4cec016f 1113 if (ret < 0 || ret != 1) {
7a57cf92 1114 PERROR("write consumer quit");
3bd1e081 1115 }
ab1027f4
DG
1116
1117 DBG("Consumer flag that it should quit");
3bd1e081
MD
1118}
1119
00e2e675
DG
1120void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1121 off_t orig_offset)
3bd1e081
MD
1122{
1123 int outfd = stream->out_fd;
1124
1125 /*
1126 * This does a blocking write-and-wait on any page that belongs to the
1127 * subbuffer prior to the one we just wrote.
1128 * Don't care about error values, as these are just hints and ways to
1129 * limit the amount of page cache used.
1130 */
ffe60014 1131 if (orig_offset < stream->max_sb_size) {
3bd1e081
MD
1132 return;
1133 }
ffe60014
DG
1134 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1135 stream->max_sb_size,
3bd1e081
MD
1136 SYNC_FILE_RANGE_WAIT_BEFORE
1137 | SYNC_FILE_RANGE_WRITE
1138 | SYNC_FILE_RANGE_WAIT_AFTER);
1139 /*
1140 * Give hints to the kernel about how we access the file:
1141 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1142 * we write it.
1143 *
1144 * We need to call fadvise again after the file grows because the
1145 * kernel does not seem to apply fadvise to non-existing parts of the
1146 * file.
1147 *
1148 * Call fadvise _after_ having waited for the page writeback to
1149 * complete because the dirty page writeback semantic is not well
1150 * defined. So it can be expected to lead to lower throughput in
1151 * streaming.
1152 */
ffe60014
DG
1153 posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1154 stream->max_sb_size, POSIX_FADV_DONTNEED);
3bd1e081
MD
1155}
1156
1157/*
1158 * Initialise the necessary environnement :
1159 * - create a new context
1160 * - create the poll_pipe
1161 * - create the should_quit pipe (for signal handler)
1162 * - create the thread pipe (for splice)
1163 *
1164 * Takes a function pointer as argument, this function is called when data is
1165 * available on a buffer. This function is responsible to do the
1166 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1167 * buffer configuration and then kernctl_put_next_subbuf at the end.
1168 *
1169 * Returns a pointer to the new context or NULL on error.
1170 */
1171struct lttng_consumer_local_data *lttng_consumer_create(
1172 enum lttng_consumer_type type,
4078b776 1173 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 1174 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
1175 int (*recv_channel)(struct lttng_consumer_channel *channel),
1176 int (*recv_stream)(struct lttng_consumer_stream *stream),
30319bcb 1177 int (*update_stream)(uint64_t stream_key, uint32_t state))
3bd1e081 1178{
d8ef542d 1179 int ret;
3bd1e081
MD
1180 struct lttng_consumer_local_data *ctx;
1181
1182 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1183 consumer_data.type == type);
1184 consumer_data.type = type;
1185
effcf122 1186 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081 1187 if (ctx == NULL) {
7a57cf92 1188 PERROR("allocating context");
3bd1e081
MD
1189 goto error;
1190 }
1191
1192 ctx->consumer_error_socket = -1;
331744e3 1193 ctx->consumer_metadata_socket = -1;
75d83e50 1194 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
3bd1e081
MD
1195 /* assign the callbacks */
1196 ctx->on_buffer_ready = buffer_ready;
1197 ctx->on_recv_channel = recv_channel;
1198 ctx->on_recv_stream = recv_stream;
1199 ctx->on_update_stream = update_stream;
1200
acdb9057
DG
1201 ctx->consumer_data_pipe = lttng_pipe_open(0);
1202 if (!ctx->consumer_data_pipe) {
3bd1e081
MD
1203 goto error_poll_pipe;
1204 }
1205
1206 ret = pipe(ctx->consumer_should_quit);
1207 if (ret < 0) {
7a57cf92 1208 PERROR("Error creating recv pipe");
3bd1e081
MD
1209 goto error_quit_pipe;
1210 }
1211
1212 ret = pipe(ctx->consumer_thread_pipe);
1213 if (ret < 0) {
7a57cf92 1214 PERROR("Error creating thread pipe");
3bd1e081
MD
1215 goto error_thread_pipe;
1216 }
1217
d8ef542d
MD
1218 ret = pipe(ctx->consumer_channel_pipe);
1219 if (ret < 0) {
1220 PERROR("Error creating channel pipe");
1221 goto error_channel_pipe;
1222 }
1223
13886d2d
DG
1224 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1225 if (!ctx->consumer_metadata_pipe) {
fb3a43a9
DG
1226 goto error_metadata_pipe;
1227 }
3bd1e081 1228
fb3a43a9
DG
1229 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1230 if (ret < 0) {
1231 goto error_splice_pipe;
1232 }
1233
1234 return ctx;
3bd1e081 1235
fb3a43a9 1236error_splice_pipe:
13886d2d 1237 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
fb3a43a9 1238error_metadata_pipe:
d8ef542d
MD
1239 utils_close_pipe(ctx->consumer_channel_pipe);
1240error_channel_pipe:
fb3a43a9 1241 utils_close_pipe(ctx->consumer_thread_pipe);
3bd1e081 1242error_thread_pipe:
d8ef542d 1243 utils_close_pipe(ctx->consumer_should_quit);
3bd1e081 1244error_quit_pipe:
acdb9057 1245 lttng_pipe_destroy(ctx->consumer_data_pipe);
3bd1e081
MD
1246error_poll_pipe:
1247 free(ctx);
1248error:
1249 return NULL;
1250}
1251
1252/*
1253 * Close all fds associated with the instance and free the context.
1254 */
1255void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1256{
4c462e79
MD
1257 int ret;
1258
ab1027f4
DG
1259 DBG("Consumer destroying it. Closing everything.");
1260
4c462e79
MD
1261 ret = close(ctx->consumer_error_socket);
1262 if (ret) {
1263 PERROR("close");
1264 }
331744e3
JD
1265 ret = close(ctx->consumer_metadata_socket);
1266 if (ret) {
1267 PERROR("close");
1268 }
d8ef542d
MD
1269 utils_close_pipe(ctx->consumer_thread_pipe);
1270 utils_close_pipe(ctx->consumer_channel_pipe);
acdb9057 1271 lttng_pipe_destroy(ctx->consumer_data_pipe);
13886d2d 1272 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
d8ef542d 1273 utils_close_pipe(ctx->consumer_should_quit);
fb3a43a9
DG
1274 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1275
3bd1e081
MD
1276 unlink(ctx->consumer_command_sock_path);
1277 free(ctx);
1278}
1279
6197aea7
DG
1280/*
1281 * Write the metadata stream id on the specified file descriptor.
1282 */
1283static int write_relayd_metadata_id(int fd,
1284 struct lttng_consumer_stream *stream,
ffe60014 1285 struct consumer_relayd_sock_pair *relayd, unsigned long padding)
6197aea7
DG
1286{
1287 int ret;
1d4dfdef 1288 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1289
1d4dfdef
DG
1290 hdr.stream_id = htobe64(stream->relayd_stream_id);
1291 hdr.padding_size = htobe32(padding);
6197aea7 1292 do {
1d4dfdef 1293 ret = write(fd, (void *) &hdr, sizeof(hdr));
6197aea7 1294 } while (ret < 0 && errno == EINTR);
4cec016f 1295 if (ret < 0 || ret != sizeof(hdr)) {
d7b75ec8
DG
1296 /*
1297 * This error means that the fd's end is closed so ignore the perror
1298 * not to clubber the error output since this can happen in a normal
1299 * code path.
1300 */
1301 if (errno != EPIPE) {
1302 PERROR("write metadata stream id");
1303 }
1304 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
534d2592
DG
1305 /*
1306 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1307 * handle writting the missing part so report that as an error and
1308 * don't lie to the caller.
1309 */
1310 ret = -1;
6197aea7
DG
1311 goto end;
1312 }
1d4dfdef
DG
1313 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1314 stream->relayd_stream_id, padding);
6197aea7
DG
1315
1316end:
1317 return ret;
1318}
1319
3bd1e081 1320/*
09e26845
DG
1321 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1322 * core function for writing trace buffers to either the local filesystem or
1323 * the network.
1324 *
79d4ffb7
DG
1325 * It must be called with the stream lock held.
1326 *
09e26845 1327 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1328 *
1329 * Returns the number of bytes written
1330 */
4078b776 1331ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081 1332 struct lttng_consumer_local_data *ctx,
1d4dfdef 1333 struct lttng_consumer_stream *stream, unsigned long len,
309167d2
JD
1334 unsigned long padding,
1335 struct lttng_packet_index *index)
3bd1e081 1336{
f02e1e8a 1337 unsigned long mmap_offset;
ffe60014 1338 void *mmap_base;
f02e1e8a
DG
1339 ssize_t ret = 0, written = 0;
1340 off_t orig_offset = stream->out_fd_offset;
1341 /* Default is on the disk */
1342 int outfd = stream->out_fd;
f02e1e8a 1343 struct consumer_relayd_sock_pair *relayd = NULL;
8994307f 1344 unsigned int relayd_hang_up = 0;
f02e1e8a
DG
1345
1346 /* RCU lock for the relayd pointer */
1347 rcu_read_lock();
1348
1349 /* Flag that the current stream if set for network streaming. */
da009f2c 1350 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1351 relayd = consumer_find_relayd(stream->net_seq_idx);
1352 if (relayd == NULL) {
56591bac 1353 ret = -EPIPE;
f02e1e8a
DG
1354 goto end;
1355 }
1356 }
1357
1358 /* get the offset inside the fd to mmap */
3bd1e081
MD
1359 switch (consumer_data.type) {
1360 case LTTNG_CONSUMER_KERNEL:
ffe60014 1361 mmap_base = stream->mmap_base;
f02e1e8a 1362 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
56591bac
MD
1363 if (ret != 0) {
1364 PERROR("tracer ctl get_mmap_read_offset");
1365 written = -errno;
1366 goto end;
1367 }
f02e1e8a 1368 break;
7753dea8
MD
1369 case LTTNG_CONSUMER32_UST:
1370 case LTTNG_CONSUMER64_UST:
ffe60014
DG
1371 mmap_base = lttng_ustctl_get_mmap_base(stream);
1372 if (!mmap_base) {
1373 ERR("read mmap get mmap base for stream %s", stream->name);
56591bac 1374 written = -EPERM;
ffe60014
DG
1375 goto end;
1376 }
1377 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
56591bac
MD
1378 if (ret != 0) {
1379 PERROR("tracer ctl get_mmap_read_offset");
1380 written = ret;
1381 goto end;
1382 }
f02e1e8a 1383 break;
3bd1e081
MD
1384 default:
1385 ERR("Unknown consumer_data type");
1386 assert(0);
1387 }
b9182dd9 1388
f02e1e8a
DG
1389 /* Handle stream on the relayd if the output is on the network */
1390 if (relayd) {
1391 unsigned long netlen = len;
1392
1393 /*
1394 * Lock the control socket for the complete duration of the function
1395 * since from this point on we will use the socket.
1396 */
1397 if (stream->metadata_flag) {
1398 /* Metadata requires the control socket. */
1399 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1d4dfdef 1400 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1401 }
1402
1d4dfdef 1403 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
f02e1e8a
DG
1404 if (ret >= 0) {
1405 /* Use the returned socket. */
1406 outfd = ret;
1407
1408 /* Write metadata stream id before payload */
1409 if (stream->metadata_flag) {
1d4dfdef 1410 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
f02e1e8a 1411 if (ret < 0) {
f02e1e8a 1412 written = ret;
8994307f
DG
1413 /* Socket operation failed. We consider the relayd dead */
1414 if (ret == -EPIPE || ret == -EINVAL) {
1415 relayd_hang_up = 1;
1416 goto write_error;
1417 }
f02e1e8a
DG
1418 goto end;
1419 }
f02e1e8a 1420 }
8994307f
DG
1421 } else {
1422 /* Socket operation failed. We consider the relayd dead */
1423 if (ret == -EPIPE || ret == -EINVAL) {
1424 relayd_hang_up = 1;
1425 goto write_error;
1426 }
1427 /* Else, use the default set before which is the filesystem. */
f02e1e8a 1428 }
1d4dfdef
DG
1429 } else {
1430 /* No streaming, we have to set the len with the full padding */
1431 len += padding;
1624d5b7
JD
1432
1433 /*
1434 * Check if we need to change the tracefile before writing the packet.
1435 */
1436 if (stream->chan->tracefile_size > 0 &&
1437 (stream->tracefile_size_current + len) >
1438 stream->chan->tracefile_size) {
fe4477ee
JD
1439 ret = utils_rotate_stream_file(stream->chan->pathname,
1440 stream->name, stream->chan->tracefile_size,
1441 stream->chan->tracefile_count, stream->uid, stream->gid,
309167d2
JD
1442 stream->out_fd, &(stream->tracefile_count_current),
1443 &stream->out_fd);
1624d5b7
JD
1444 if (ret < 0) {
1445 ERR("Rotating output file");
1446 goto end;
1447 }
309167d2
JD
1448 outfd = stream->out_fd;
1449
1450 if (stream->index_fd >= 0) {
1451 ret = index_create_file(stream->chan->pathname,
1452 stream->name, stream->uid, stream->gid,
1453 stream->chan->tracefile_size,
1454 stream->tracefile_count_current);
1455 if (ret < 0) {
1456 goto end;
1457 }
1458 stream->index_fd = ret;
1459 }
1460
a6976990
DG
1461 /* Reset current size because we just perform a rotation. */
1462 stream->tracefile_size_current = 0;
a1ae300f
JD
1463 stream->out_fd_offset = 0;
1464 orig_offset = 0;
1624d5b7
JD
1465 }
1466 stream->tracefile_size_current += len;
309167d2
JD
1467 if (index) {
1468 index->offset = htobe64(stream->out_fd_offset);
1469 }
f02e1e8a
DG
1470 }
1471
1472 while (len > 0) {
1473 do {
ffe60014 1474 ret = write(outfd, mmap_base + mmap_offset, len);
f02e1e8a 1475 } while (ret < 0 && errno == EINTR);
1d4dfdef 1476 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
f02e1e8a 1477 if (ret < 0) {
c5c45efa
DG
1478 /*
1479 * This is possible if the fd is closed on the other side (outfd)
1480 * or any write problem. It can be verbose a bit for a normal
1481 * execution if for instance the relayd is stopped abruptly. This
1482 * can happen so set this to a DBG statement.
1483 */
1484 DBG("Error in file write mmap");
f02e1e8a 1485 if (written == 0) {
56591bac 1486 written = -errno;
f02e1e8a 1487 }
8994307f
DG
1488 /* Socket operation failed. We consider the relayd dead */
1489 if (errno == EPIPE || errno == EINVAL) {
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
f02e1e8a
DG
1493 goto end;
1494 } else if (ret > len) {
77c7c900 1495 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
f02e1e8a
DG
1496 written += ret;
1497 goto end;
1498 } else {
1499 len -= ret;
1500 mmap_offset += ret;
1501 }
f02e1e8a
DG
1502
1503 /* This call is useless on a socket so better save a syscall. */
1504 if (!relayd) {
1505 /* This won't block, but will start writeout asynchronously */
1506 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1507 SYNC_FILE_RANGE_WRITE);
1508 stream->out_fd_offset += ret;
1509 }
e5d1a9b3 1510 stream->output_written += ret;
f02e1e8a
DG
1511 written += ret;
1512 }
1513 lttng_consumer_sync_trace_file(stream, orig_offset);
1514
8994307f
DG
1515write_error:
1516 /*
1517 * This is a special case that the relayd has closed its socket. Let's
1518 * cleanup the relayd object and all associated streams.
1519 */
1520 if (relayd && relayd_hang_up) {
1521 cleanup_relayd(relayd, ctx);
1522 }
1523
f02e1e8a
DG
1524end:
1525 /* Unlock only if ctrl socket used */
1526 if (relayd && stream->metadata_flag) {
1527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1528 }
1529
1530 rcu_read_unlock();
1531 return written;
3bd1e081
MD
1532}
1533
1534/*
1535 * Splice the data from the ring buffer to the tracefile.
1536 *
79d4ffb7
DG
1537 * It must be called with the stream lock held.
1538 *
3bd1e081
MD
1539 * Returns the number of bytes spliced.
1540 */
4078b776 1541ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081 1542 struct lttng_consumer_local_data *ctx,
1d4dfdef 1543 struct lttng_consumer_stream *stream, unsigned long len,
309167d2
JD
1544 unsigned long padding,
1545 struct lttng_packet_index *index)
3bd1e081 1546{
f02e1e8a
DG
1547 ssize_t ret = 0, written = 0, ret_splice = 0;
1548 loff_t offset = 0;
1549 off_t orig_offset = stream->out_fd_offset;
1550 int fd = stream->wait_fd;
1551 /* Default is on the disk */
1552 int outfd = stream->out_fd;
f02e1e8a 1553 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1554 int *splice_pipe;
8994307f 1555 unsigned int relayd_hang_up = 0;
f02e1e8a 1556
3bd1e081
MD
1557 switch (consumer_data.type) {
1558 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1559 break;
7753dea8
MD
1560 case LTTNG_CONSUMER32_UST:
1561 case LTTNG_CONSUMER64_UST:
f02e1e8a 1562 /* Not supported for user space tracing */
3bd1e081
MD
1563 return -ENOSYS;
1564 default:
1565 ERR("Unknown consumer_data type");
1566 assert(0);
3bd1e081
MD
1567 }
1568
f02e1e8a
DG
1569 /* RCU lock for the relayd pointer */
1570 rcu_read_lock();
1571
1572 /* Flag that the current stream if set for network streaming. */
da009f2c 1573 if (stream->net_seq_idx != (uint64_t) -1ULL) {
f02e1e8a
DG
1574 relayd = consumer_find_relayd(stream->net_seq_idx);
1575 if (relayd == NULL) {
56591bac 1576 ret = -EPIPE;
f02e1e8a
DG
1577 goto end;
1578 }
1579 }
1580
fb3a43a9
DG
1581 /*
1582 * Choose right pipe for splice. Metadata and trace data are handled by
1583 * different threads hence the use of two pipes in order not to race or
1584 * corrupt the written data.
1585 */
1586 if (stream->metadata_flag) {
1587 splice_pipe = ctx->consumer_splice_metadata_pipe;
1588 } else {
1589 splice_pipe = ctx->consumer_thread_pipe;
1590 }
1591
f02e1e8a 1592 /* Write metadata stream id before payload */
1d4dfdef
DG
1593 if (relayd) {
1594 int total_len = len;
f02e1e8a 1595
1d4dfdef
DG
1596 if (stream->metadata_flag) {
1597 /*
1598 * Lock the control socket for the complete duration of the function
1599 * since from this point on we will use the socket.
1600 */
1601 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1602
1603 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1604 padding);
1605 if (ret < 0) {
1606 written = ret;
8994307f
DG
1607 /* Socket operation failed. We consider the relayd dead */
1608 if (ret == -EBADF) {
1609 WARN("Remote relayd disconnected. Stopping");
1610 relayd_hang_up = 1;
1611 goto write_error;
1612 }
1d4dfdef
DG
1613 goto end;
1614 }
1615
1616 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1617 }
1618
1619 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1620 if (ret >= 0) {
1621 /* Use the returned socket. */
1622 outfd = ret;
1623 } else {
8994307f
DG
1624 /* Socket operation failed. We consider the relayd dead */
1625 if (ret == -EBADF) {
1626 WARN("Remote relayd disconnected. Stopping");
1627 relayd_hang_up = 1;
1628 goto write_error;
1629 }
f02e1e8a
DG
1630 goto end;
1631 }
1d4dfdef
DG
1632 } else {
1633 /* No streaming, we have to set the len with the full padding */
1634 len += padding;
1624d5b7
JD
1635
1636 /*
1637 * Check if we need to change the tracefile before writing the packet.
1638 */
1639 if (stream->chan->tracefile_size > 0 &&
1640 (stream->tracefile_size_current + len) >
1641 stream->chan->tracefile_size) {
fe4477ee
JD
1642 ret = utils_rotate_stream_file(stream->chan->pathname,
1643 stream->name, stream->chan->tracefile_size,
1644 stream->chan->tracefile_count, stream->uid, stream->gid,
309167d2
JD
1645 stream->out_fd, &(stream->tracefile_count_current),
1646 &stream->out_fd);
1624d5b7
JD
1647 if (ret < 0) {
1648 ERR("Rotating output file");
1649 goto end;
1650 }
309167d2
JD
1651 outfd = stream->out_fd;
1652
1653 if (stream->index_fd >= 0) {
1654 ret = index_create_file(stream->chan->pathname,
1655 stream->name, stream->uid, stream->gid,
1656 stream->chan->tracefile_size,
1657 stream->tracefile_count_current);
1658 if (ret < 0) {
1659 goto end;
1660 }
1661 stream->index_fd = ret;
1662 }
1663
a6976990
DG
1664 /* Reset current size because we just perform a rotation. */
1665 stream->tracefile_size_current = 0;
a1ae300f
JD
1666 stream->out_fd_offset = 0;
1667 orig_offset = 0;
1624d5b7
JD
1668 }
1669 stream->tracefile_size_current += len;
309167d2 1670 index->offset = htobe64(stream->out_fd_offset);
f02e1e8a
DG
1671 }
1672
1673 while (len > 0) {
1d4dfdef
DG
1674 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1675 (unsigned long)offset, len, fd, splice_pipe[1]);
fb3a43a9 1676 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1677 SPLICE_F_MOVE | SPLICE_F_MORE);
1678 DBG("splice chan to pipe, ret %zd", ret_splice);
1679 if (ret_splice < 0) {
1680 PERROR("Error in relay splice");
1681 if (written == 0) {
1682 written = ret_splice;
1683 }
1684 ret = errno;
1685 goto splice_error;
1686 }
1687
1688 /* Handle stream on the relayd if the output is on the network */
1689 if (relayd) {
1690 if (stream->metadata_flag) {
1d4dfdef
DG
1691 size_t metadata_payload_size =
1692 sizeof(struct lttcomm_relayd_metadata_payload);
1693
f02e1e8a 1694 /* Update counter to fit the spliced data */
1d4dfdef
DG
1695 ret_splice += metadata_payload_size;
1696 len += metadata_payload_size;
f02e1e8a
DG
1697 /*
1698 * We do this so the return value can match the len passed as
1699 * argument to this function.
1700 */
1d4dfdef 1701 written -= metadata_payload_size;
f02e1e8a
DG
1702 }
1703 }
1704
1705 /* Splice data out */
fb3a43a9 1706 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a 1707 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1d4dfdef 1708 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
f02e1e8a
DG
1709 if (ret_splice < 0) {
1710 PERROR("Error in file splice");
1711 if (written == 0) {
1712 written = ret_splice;
1713 }
8994307f 1714 /* Socket operation failed. We consider the relayd dead */
00c8752b 1715 if (errno == EBADF || errno == EPIPE) {
8994307f
DG
1716 WARN("Remote relayd disconnected. Stopping");
1717 relayd_hang_up = 1;
1718 goto write_error;
1719 }
f02e1e8a
DG
1720 ret = errno;
1721 goto splice_error;
1722 } else if (ret_splice > len) {
1723 errno = EINVAL;
1724 PERROR("Wrote more data than requested %zd (len: %lu)",
1725 ret_splice, len);
1726 written += ret_splice;
1727 ret = errno;
1728 goto splice_error;
1729 }
1730 len -= ret_splice;
1731
1732 /* This call is useless on a socket so better save a syscall. */
1733 if (!relayd) {
1734 /* This won't block, but will start writeout asynchronously */
1735 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1736 SYNC_FILE_RANGE_WRITE);
1737 stream->out_fd_offset += ret_splice;
1738 }
e5d1a9b3 1739 stream->output_written += ret_splice;
f02e1e8a
DG
1740 written += ret_splice;
1741 }
1742 lttng_consumer_sync_trace_file(stream, orig_offset);
1743
1744 ret = ret_splice;
1745
1746 goto end;
1747
8994307f
DG
1748write_error:
1749 /*
1750 * This is a special case that the relayd has closed its socket. Let's
1751 * cleanup the relayd object and all associated streams.
1752 */
1753 if (relayd && relayd_hang_up) {
1754 cleanup_relayd(relayd, ctx);
1755 /* Skip splice error so the consumer does not fail */
1756 goto end;
1757 }
1758
f02e1e8a
DG
1759splice_error:
1760 /* send the appropriate error description to sessiond */
1761 switch (ret) {
f02e1e8a 1762 case EINVAL:
f73fabfd 1763 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1764 break;
1765 case ENOMEM:
f73fabfd 1766 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1767 break;
1768 case ESPIPE:
f73fabfd 1769 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1770 break;
1771 }
1772
1773end:
1774 if (relayd && stream->metadata_flag) {
1775 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1776 }
1777
1778 rcu_read_unlock();
1779 return written;
3bd1e081
MD
1780}
1781
1782/*
1783 * Take a snapshot for a specific fd
1784 *
1785 * Returns 0 on success, < 0 on error
1786 */
ffe60014 1787int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
1788{
1789 switch (consumer_data.type) {
1790 case LTTNG_CONSUMER_KERNEL:
ffe60014 1791 return lttng_kconsumer_take_snapshot(stream);
7753dea8
MD
1792 case LTTNG_CONSUMER32_UST:
1793 case LTTNG_CONSUMER64_UST:
ffe60014 1794 return lttng_ustconsumer_take_snapshot(stream);
3bd1e081
MD
1795 default:
1796 ERR("Unknown consumer_data type");
1797 assert(0);
1798 return -ENOSYS;
1799 }
3bd1e081
MD
1800}
1801
1802/*
1803 * Get the produced position
1804 *
1805 * Returns 0 on success, < 0 on error
1806 */
ffe60014 1807int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
1808 unsigned long *pos)
1809{
1810 switch (consumer_data.type) {
1811 case LTTNG_CONSUMER_KERNEL:
ffe60014 1812 return lttng_kconsumer_get_produced_snapshot(stream, pos);
7753dea8
MD
1813 case LTTNG_CONSUMER32_UST:
1814 case LTTNG_CONSUMER64_UST:
ffe60014 1815 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
3bd1e081
MD
1816 default:
1817 ERR("Unknown consumer_data type");
1818 assert(0);
1819 return -ENOSYS;
1820 }
1821}
1822
1823int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1824 int sock, struct pollfd *consumer_sockpoll)
1825{
1826 switch (consumer_data.type) {
1827 case LTTNG_CONSUMER_KERNEL:
1828 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1829 case LTTNG_CONSUMER32_UST:
1830 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1831 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1832 default:
1833 ERR("Unknown consumer_data type");
1834 assert(0);
1835 return -ENOSYS;
1836 }
1837}
1838
43c34bc3
DG
1839/*
1840 * Iterate over all streams of the hashtable and free them properly.
1841 *
1842 * WARNING: *MUST* be used with data stream only.
1843 */
1844static void destroy_data_stream_ht(struct lttng_ht *ht)
1845{
43c34bc3
DG
1846 struct lttng_ht_iter iter;
1847 struct lttng_consumer_stream *stream;
1848
1849 if (ht == NULL) {
1850 return;
1851 }
1852
1853 rcu_read_lock();
1854 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1855 /*
1856 * Ignore return value since we are currently cleaning up so any error
1857 * can't be handled.
1858 */
1859 (void) consumer_del_stream(stream, ht);
43c34bc3
DG
1860 }
1861 rcu_read_unlock();
1862
1863 lttng_ht_destroy(ht);
1864}
1865
fb3a43a9 1866/*
f724d81e 1867 * Iterate over all streams of the hashtable and free them properly.
e316aad5
DG
1868 *
1869 * XXX: Should not be only for metadata stream or else use an other name.
fb3a43a9
DG
1870 */
1871static void destroy_stream_ht(struct lttng_ht *ht)
1872{
fb3a43a9
DG
1873 struct lttng_ht_iter iter;
1874 struct lttng_consumer_stream *stream;
1875
1876 if (ht == NULL) {
1877 return;
1878 }
1879
d09e1200 1880 rcu_read_lock();
58b1f425 1881 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
5c540210
DG
1882 /*
1883 * Ignore return value since we are currently cleaning up so any error
1884 * can't be handled.
1885 */
1886 (void) consumer_del_metadata_stream(stream, ht);
fb3a43a9 1887 }
d09e1200 1888 rcu_read_unlock();
fb3a43a9
DG
1889
1890 lttng_ht_destroy(ht);
1891}
1892
d88aee68
DG
1893void lttng_consumer_close_metadata(void)
1894{
1895 switch (consumer_data.type) {
1896 case LTTNG_CONSUMER_KERNEL:
1897 /*
1898 * The Kernel consumer has a different metadata scheme so we don't
1899 * close anything because the stream will be closed by the session
1900 * daemon.
1901 */
1902 break;
1903 case LTTNG_CONSUMER32_UST:
1904 case LTTNG_CONSUMER64_UST:
1905 /*
1906 * Close all metadata streams. The metadata hash table is passed and
1907 * this call iterates over it by closing all wakeup fd. This is safe
1908 * because at this point we are sure that the metadata producer is
1909 * either dead or blocked.
1910 */
1911 lttng_ustconsumer_close_metadata(metadata_ht);
1912 break;
1913 default:
1914 ERR("Unknown consumer_data type");
1915 assert(0);
1916 }
1917}
1918
fb3a43a9
DG
1919/*
1920 * Clean up a metadata stream and free its memory.
1921 */
e316aad5
DG
1922void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1923 struct lttng_ht *ht)
fb3a43a9
DG
1924{
1925 int ret;
e316aad5
DG
1926 struct lttng_ht_iter iter;
1927 struct lttng_consumer_channel *free_chan = NULL;
fb3a43a9
DG
1928 struct consumer_relayd_sock_pair *relayd;
1929
1930 assert(stream);
1931 /*
1932 * This call should NEVER receive regular stream. It must always be
1933 * metadata stream and this is crucial for data structure synchronization.
1934 */
1935 assert(stream->metadata_flag);
1936
e316aad5
DG
1937 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1938
1939 if (ht == NULL) {
1940 /* Means the stream was allocated but not successfully added */
ffe60014 1941 goto free_stream_rcu;
e316aad5
DG
1942 }
1943
74251bb8 1944 pthread_mutex_lock(&consumer_data.lock);
a9838785 1945 pthread_mutex_lock(&stream->chan->lock);
8994307f
DG
1946 pthread_mutex_lock(&stream->lock);
1947
fb3a43a9
DG
1948 switch (consumer_data.type) {
1949 case LTTNG_CONSUMER_KERNEL:
1950 if (stream->mmap_base != NULL) {
1951 ret = munmap(stream->mmap_base, stream->mmap_len);
1952 if (ret != 0) {
1953 PERROR("munmap metadata stream");
1954 }
1955 }
4c95e622
JD
1956 if (stream->wait_fd >= 0) {
1957 ret = close(stream->wait_fd);
1958 if (ret < 0) {
1959 PERROR("close kernel metadata wait_fd");
1960 }
1961 }
fb3a43a9
DG
1962 break;
1963 case LTTNG_CONSUMER32_UST:
1964 case LTTNG_CONSUMER64_UST:
04ef1097
MD
1965 if (stream->monitor) {
1966 /* close the write-side in close_metadata */
1967 ret = close(stream->ust_metadata_poll_pipe[0]);
1968 if (ret < 0) {
1969 PERROR("Close UST metadata read-side poll pipe");
1970 }
1971 }
fb3a43a9
DG
1972 lttng_ustconsumer_del_stream(stream);
1973 break;
1974 default:
1975 ERR("Unknown consumer_data type");
1976 assert(0);
e316aad5 1977 goto end;
fb3a43a9 1978 }
fb3a43a9 1979
c869f647 1980 rcu_read_lock();
58b1f425 1981 iter.iter.node = &stream->node.node;
c869f647
DG
1982 ret = lttng_ht_del(ht, &iter);
1983 assert(!ret);
ca22feea 1984
d8ef542d
MD
1985 iter.iter.node = &stream->node_channel_id.node;
1986 ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
1987 assert(!ret);
1988
ca22feea
DG
1989 iter.iter.node = &stream->node_session_id.node;
1990 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1991 assert(!ret);
c869f647
DG
1992 rcu_read_unlock();
1993
fb3a43a9
DG
1994 if (stream->out_fd >= 0) {
1995 ret = close(stream->out_fd);
1996 if (ret) {
1997 PERROR("close");
1998 }
1999 }
2000
fb3a43a9
DG
2001 /* Check and cleanup relayd */
2002 rcu_read_lock();
2003 relayd = consumer_find_relayd(stream->net_seq_idx);
2004 if (relayd != NULL) {
2005 uatomic_dec(&relayd->refcount);
2006 assert(uatomic_read(&relayd->refcount) >= 0);
2007
2008 /* Closing streams requires to lock the control socket. */
2009 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2010 ret = relayd_send_close_stream(&relayd->control_sock,
2011 stream->relayd_stream_id, stream->next_net_seq_num - 1);
2012 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2013 if (ret < 0) {
2014 DBG("Unable to close stream on the relayd. Continuing");
2015 /*
2016 * Continue here. There is nothing we can do for the relayd.
2017 * Chances are that the relayd has closed the socket so we just
2018 * continue cleaning up.
2019 */
2020 }
2021
2022 /* Both conditions are met, we destroy the relayd. */
2023 if (uatomic_read(&relayd->refcount) == 0 &&
2024 uatomic_read(&relayd->destroy_flag)) {
51230d70 2025 consumer_destroy_relayd(relayd);
fb3a43a9
DG
2026 }
2027 }
2028 rcu_read_unlock();
2029
2030 /* Atomically decrement channel refcount since other threads can use it. */
f2ad556d 2031 if (!uatomic_sub_return(&stream->chan->refcount, 1)
ffe60014 2032 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
c30aaa51 2033 /* Go for channel deletion! */
e316aad5 2034 free_chan = stream->chan;
fb3a43a9
DG
2035 }
2036
e316aad5 2037end:
73811ecc
DG
2038 /*
2039 * Nullify the stream reference so it is not used after deletion. The
5e41ebe1
MD
2040 * channel lock MUST be acquired before being able to check for
2041 * a NULL pointer value.
73811ecc
DG
2042 */
2043 stream->chan->metadata_stream = NULL;
2044
8994307f 2045 pthread_mutex_unlock(&stream->lock);
a9838785 2046 pthread_mutex_unlock(&stream->chan->lock);
74251bb8 2047 pthread_mutex_unlock(&consumer_data.lock);
e316aad5
DG
2048
2049 if (free_chan) {
2050 consumer_del_channel(free_chan);
2051 }
2052
ffe60014
DG
2053free_stream_rcu:
2054 call_rcu(&stream->node.head, free_stream_rcu);
fb3a43a9
DG
2055}
2056
2057/*
2058 * Action done with the metadata stream when adding it to the consumer internal
2059 * data structures to handle it.
2060 */
5ab66908 2061int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
fb3a43a9 2062{
5ab66908 2063 struct lttng_ht *ht = metadata_ht;
e316aad5 2064 int ret = 0;
76082088 2065 struct lttng_ht_iter iter;
d88aee68 2066 struct lttng_ht_node_u64 *node;
fb3a43a9 2067
e316aad5
DG
2068 assert(stream);
2069 assert(ht);
2070
d88aee68 2071 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
e316aad5
DG
2072
2073 pthread_mutex_lock(&consumer_data.lock);
a9838785 2074 pthread_mutex_lock(&stream->chan->lock);
ec6ea7d0 2075 pthread_mutex_lock(&stream->chan->timer_lock);
2e818a6a 2076 pthread_mutex_lock(&stream->lock);
e316aad5 2077
e316aad5
DG
2078 /*
2079 * From here, refcounts are updated so be _careful_ when returning an error
2080 * after this point.
2081 */
2082
fb3a43a9 2083 rcu_read_lock();
76082088
DG
2084
2085 /*
2086 * Lookup the stream just to make sure it does not exist in our internal
2087 * state. This should NEVER happen.
2088 */
d88aee68
DG
2089 lttng_ht_lookup(ht, &stream->key, &iter);
2090 node = lttng_ht_iter_get_node_u64(&iter);
76082088
DG
2091 assert(!node);
2092
e316aad5 2093 /*
ffe60014
DG
2094 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2095 * in terms of destroying the associated channel, because the action that
e316aad5
DG
2096 * causes the count to become 0 also causes a stream to be added. The
2097 * channel deletion will thus be triggered by the following removal of this
2098 * stream.
2099 */
ffe60014 2100 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
f2ad556d
MD
2101 /* Increment refcount before decrementing nb_init_stream_left */
2102 cmm_smp_wmb();
ffe60014 2103 uatomic_dec(&stream->chan->nb_init_stream_left);
e316aad5
DG
2104 }
2105
d88aee68 2106 lttng_ht_add_unique_u64(ht, &stream->node);
ca22feea 2107
d8ef542d
MD
2108 lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
2109 &stream->node_channel_id);
2110
ca22feea
DG
2111 /*
2112 * Add stream to the stream_list_ht of the consumer data. No need to steal
2113 * the key since the HT does not use it and we allow to add redundant keys
2114 * into this table.
2115 */
d88aee68 2116 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
ca22feea 2117
fb3a43a9 2118 rcu_read_unlock();
e316aad5 2119
2e818a6a 2120 pthread_mutex_unlock(&stream->lock);
a9838785 2121 pthread_mutex_unlock(&stream->chan->lock);
ec6ea7d0 2122 pthread_mutex_unlock(&stream->chan->timer_lock);
e316aad5
DG
2123 pthread_mutex_unlock(&consumer_data.lock);
2124 return ret;
fb3a43a9
DG
2125}
2126
8994307f
DG
2127/*
2128 * Delete data stream that are flagged for deletion (endpoint_status).
2129 */
2130static void validate_endpoint_status_data_stream(void)
2131{
2132 struct lttng_ht_iter iter;
2133 struct lttng_consumer_stream *stream;
2134
2135 DBG("Consumer delete flagged data stream");
2136
2137 rcu_read_lock();
2138 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2139 /* Validate delete flag of the stream */
79d4ffb7 2140 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2141 continue;
2142 }
2143 /* Delete it right now */
2144 consumer_del_stream(stream, data_ht);
2145 }
2146 rcu_read_unlock();
2147}
2148
2149/*
2150 * Delete metadata stream that are flagged for deletion (endpoint_status).
2151 */
2152static void validate_endpoint_status_metadata_stream(
2153 struct lttng_poll_event *pollset)
2154{
2155 struct lttng_ht_iter iter;
2156 struct lttng_consumer_stream *stream;
2157
2158 DBG("Consumer delete flagged metadata stream");
2159
2160 assert(pollset);
2161
2162 rcu_read_lock();
2163 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2164 /* Validate delete flag of the stream */
79d4ffb7 2165 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
8994307f
DG
2166 continue;
2167 }
2168 /*
2169 * Remove from pollset so the metadata thread can continue without
2170 * blocking on a deleted stream.
2171 */
2172 lttng_poll_del(pollset, stream->wait_fd);
2173
2174 /* Delete it right now */
2175 consumer_del_metadata_stream(stream, metadata_ht);
2176 }
2177 rcu_read_unlock();
2178}
2179
fb3a43a9
DG
2180/*
2181 * Thread polls on metadata file descriptor and write them on disk or on the
2182 * network.
2183 */
7d980def 2184void *consumer_thread_metadata_poll(void *data)
fb3a43a9 2185{
1fc79fb4 2186 int ret, i, pollfd, err = -1;
fb3a43a9 2187 uint32_t revents, nb_fd;
e316aad5 2188 struct lttng_consumer_stream *stream = NULL;
fb3a43a9 2189 struct lttng_ht_iter iter;
d88aee68 2190 struct lttng_ht_node_u64 *node;
fb3a43a9
DG
2191 struct lttng_poll_event events;
2192 struct lttng_consumer_local_data *ctx = data;
2193 ssize_t len;
2194
2195 rcu_register_thread();
2196
1fc79fb4
MD
2197 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2198
9ce5646a
MD
2199 health_code_update();
2200
d88aee68 2201 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
04bb2b64
DG
2202 if (!metadata_ht) {
2203 /* ENOMEM at this point. Better to bail out. */
d8ef542d 2204 goto end_ht;
04bb2b64
DG
2205 }
2206
fb3a43a9
DG
2207 DBG("Thread metadata poll started");
2208
fb3a43a9
DG
2209 /* Size is set to 1 for the consumer_metadata pipe */
2210 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2211 if (ret < 0) {
2212 ERR("Poll set creation failed");
d8ef542d 2213 goto end_poll;
fb3a43a9
DG
2214 }
2215
13886d2d
DG
2216 ret = lttng_poll_add(&events,
2217 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
fb3a43a9
DG
2218 if (ret < 0) {
2219 goto end;
2220 }
2221
2222 /* Main loop */
2223 DBG("Metadata main loop started");
2224
2225 while (1) {
9ce5646a
MD
2226 health_code_update();
2227
fb3a43a9 2228 /* Only the metadata pipe is set */
d21b0d71 2229 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
1fc79fb4 2230 err = 0; /* All is OK */
fb3a43a9
DG
2231 goto end;
2232 }
2233
2234restart:
d21b0d71 2235 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2236 health_poll_entry();
fb3a43a9 2237 ret = lttng_poll_wait(&events, -1);
9ce5646a 2238 health_poll_exit();
fb3a43a9
DG
2239 DBG("Metadata event catched in thread");
2240 if (ret < 0) {
2241 if (errno == EINTR) {
e316aad5 2242 ERR("Poll EINTR catched");
fb3a43a9
DG
2243 goto restart;
2244 }
2245 goto error;
2246 }
2247
0d9c5d77
DG
2248 nb_fd = ret;
2249
e316aad5 2250 /* From here, the event is a metadata wait fd */
fb3a43a9 2251 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2252 health_code_update();
2253
fb3a43a9
DG
2254 revents = LTTNG_POLL_GETEV(&events, i);
2255 pollfd = LTTNG_POLL_GETFD(&events, i);
2256
13886d2d 2257 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
4adabd61 2258 if (revents & (LPOLLERR | LPOLLHUP )) {
fb3a43a9
DG
2259 DBG("Metadata thread pipe hung up");
2260 /*
2261 * Remove the pipe from the poll set and continue the loop
2262 * since their might be data to consume.
2263 */
13886d2d
DG
2264 lttng_poll_del(&events,
2265 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2266 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
fb3a43a9
DG
2267 continue;
2268 } else if (revents & LPOLLIN) {
13886d2d
DG
2269 ssize_t pipe_len;
2270
2271 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2272 &stream, sizeof(stream));
2273 if (pipe_len < 0) {
6a00837f 2274 ERR("read metadata stream, ret: %zd", pipe_len);
fb3a43a9 2275 /*
13886d2d 2276 * Continue here to handle the rest of the streams.
fb3a43a9
DG
2277 */
2278 continue;
2279 }
2280
8994307f
DG
2281 /* A NULL stream means that the state has changed. */
2282 if (stream == NULL) {
2283 /* Check for deleted streams. */
2284 validate_endpoint_status_metadata_stream(&events);
3714380f 2285 goto restart;
8994307f
DG
2286 }
2287
fb3a43a9
DG
2288 DBG("Adding metadata stream %d to poll set",
2289 stream->wait_fd);
2290
fb3a43a9
DG
2291 /* Add metadata stream to the global poll events list */
2292 lttng_poll_add(&events, stream->wait_fd,
2293 LPOLLIN | LPOLLPRI);
fb3a43a9
DG
2294 }
2295
e316aad5 2296 /* Handle other stream */
fb3a43a9
DG
2297 continue;
2298 }
2299
d09e1200 2300 rcu_read_lock();
d88aee68
DG
2301 {
2302 uint64_t tmp_id = (uint64_t) pollfd;
2303
2304 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2305 }
2306 node = lttng_ht_iter_get_node_u64(&iter);
e316aad5 2307 assert(node);
fb3a43a9
DG
2308
2309 stream = caa_container_of(node, struct lttng_consumer_stream,
58b1f425 2310 node);
fb3a43a9 2311
e316aad5 2312 /* Check for error event */
4adabd61 2313 if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2314 DBG("Metadata fd %d is hup|err.", pollfd);
fb3a43a9
DG
2315 if (!stream->hangup_flush_done
2316 && (consumer_data.type == LTTNG_CONSUMER32_UST
2317 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2318 DBG("Attempting to flush and consume the UST buffers");
2319 lttng_ustconsumer_on_stream_hangup(stream);
2320
2321 /* We just flushed the stream now read it. */
4bb94b75 2322 do {
9ce5646a
MD
2323 health_code_update();
2324
4bb94b75
DG
2325 len = ctx->on_buffer_ready(stream, ctx);
2326 /*
2327 * We don't check the return value here since if we get
2328 * a negative len, it means an error occured thus we
2329 * simply remove it from the poll set and free the
2330 * stream.
2331 */
2332 } while (len > 0);
fb3a43a9
DG
2333 }
2334
fb3a43a9 2335 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2336 /*
2337 * This call update the channel states, closes file descriptors
2338 * and securely free the stream.
2339 */
2340 consumer_del_metadata_stream(stream, metadata_ht);
2341 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2342 /* Get the data out of the metadata file descriptor */
2343 DBG("Metadata available on fd %d", pollfd);
2344 assert(stream->wait_fd == pollfd);
2345
04ef1097 2346 do {
9ce5646a
MD
2347 health_code_update();
2348
04ef1097
MD
2349 len = ctx->on_buffer_ready(stream, ctx);
2350 /*
2351 * We don't check the return value here since if we get
2352 * a negative len, it means an error occured thus we
2353 * simply remove it from the poll set and free the
2354 * stream.
2355 */
2356 } while (len > 0);
2357
e316aad5 2358 /* It's ok to have an unavailable sub-buffer */
b64403e3 2359 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2360 /* Clean up stream from consumer and free it. */
2361 lttng_poll_del(&events, stream->wait_fd);
2362 consumer_del_metadata_stream(stream, metadata_ht);
e316aad5 2363 }
fb3a43a9 2364 }
e316aad5
DG
2365
2366 /* Release RCU lock for the stream looked up */
d09e1200 2367 rcu_read_unlock();
fb3a43a9
DG
2368 }
2369 }
2370
1fc79fb4
MD
2371 /* All is OK */
2372 err = 0;
fb3a43a9
DG
2373error:
2374end:
2375 DBG("Metadata poll thread exiting");
fb3a43a9 2376
d8ef542d
MD
2377 lttng_poll_clean(&events);
2378end_poll:
04bb2b64 2379 destroy_stream_ht(metadata_ht);
d8ef542d 2380end_ht:
1fc79fb4
MD
2381 if (err) {
2382 health_error();
2383 ERR("Health error occurred in %s", __func__);
2384 }
2385 health_unregister(health_consumerd);
fb3a43a9
DG
2386 rcu_unregister_thread();
2387 return NULL;
2388}
2389
3bd1e081 2390/*
e4421fec 2391 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2392 * it to tracefile if necessary.
2393 */
7d980def 2394void *consumer_thread_data_poll(void *data)
3bd1e081 2395{
1fc79fb4 2396 int num_rdy, num_hup, high_prio, ret, i, err = -1;
3bd1e081
MD
2397 struct pollfd *pollfd = NULL;
2398 /* local view of the streams */
c869f647 2399 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
3bd1e081
MD
2400 /* local view of consumer_data.fds_count */
2401 int nb_fd = 0;
3bd1e081 2402 struct lttng_consumer_local_data *ctx = data;
00e2e675 2403 ssize_t len;
3bd1e081 2404
e7b994a3
DG
2405 rcu_register_thread();
2406
1fc79fb4
MD
2407 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2408
9ce5646a
MD
2409 health_code_update();
2410
d88aee68 2411 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
43c34bc3 2412 if (data_ht == NULL) {
04bb2b64 2413 /* ENOMEM at this point. Better to bail out. */
43c34bc3
DG
2414 goto end;
2415 }
2416
4df6c8cb
MD
2417 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2418 if (local_stream == NULL) {
2419 PERROR("local_stream malloc");
2420 goto end;
2421 }
3bd1e081
MD
2422
2423 while (1) {
9ce5646a
MD
2424 health_code_update();
2425
3bd1e081
MD
2426 high_prio = 0;
2427 num_hup = 0;
2428
2429 /*
e4421fec 2430 * the fds set has been updated, we need to update our
3bd1e081
MD
2431 * local array as well
2432 */
2433 pthread_mutex_lock(&consumer_data.lock);
2434 if (consumer_data.need_update) {
0e428499
DG
2435 free(pollfd);
2436 pollfd = NULL;
2437
2438 free(local_stream);
2439 local_stream = NULL;
3bd1e081 2440
50f8ae69 2441 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2442 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081 2443 if (pollfd == NULL) {
7a57cf92 2444 PERROR("pollfd malloc");
3bd1e081
MD
2445 pthread_mutex_unlock(&consumer_data.lock);
2446 goto end;
2447 }
2448
50f8ae69 2449 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2450 local_stream = zmalloc((consumer_data.stream_count + 1) *
747f8642 2451 sizeof(struct lttng_consumer_stream *));
3bd1e081 2452 if (local_stream == NULL) {
7a57cf92 2453 PERROR("local_stream malloc");
3bd1e081
MD
2454 pthread_mutex_unlock(&consumer_data.lock);
2455 goto end;
2456 }
ffe60014 2457 ret = update_poll_array(ctx, &pollfd, local_stream,
43c34bc3 2458 data_ht);
3bd1e081
MD
2459 if (ret < 0) {
2460 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2461 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2462 pthread_mutex_unlock(&consumer_data.lock);
2463 goto end;
2464 }
2465 nb_fd = ret;
2466 consumer_data.need_update = 0;
2467 }
2468 pthread_mutex_unlock(&consumer_data.lock);
2469
4078b776
MD
2470 /* No FDs and consumer_quit, consumer_cleanup the thread */
2471 if (nb_fd == 0 && consumer_quit == 1) {
1fc79fb4 2472 err = 0; /* All is OK */
4078b776
MD
2473 goto end;
2474 }
3bd1e081 2475 /* poll on the array of fds */
88f2b785 2476 restart:
3bd1e081 2477 DBG("polling on %d fd", nb_fd + 1);
9ce5646a 2478 health_poll_entry();
cb365c03 2479 num_rdy = poll(pollfd, nb_fd + 1, -1);
9ce5646a 2480 health_poll_exit();
3bd1e081
MD
2481 DBG("poll num_rdy : %d", num_rdy);
2482 if (num_rdy == -1) {
88f2b785
MD
2483 /*
2484 * Restart interrupted system call.
2485 */
2486 if (errno == EINTR) {
2487 goto restart;
2488 }
7a57cf92 2489 PERROR("Poll error");
f73fabfd 2490 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2491 goto end;
2492 } else if (num_rdy == 0) {
2493 DBG("Polling thread timed out");
2494 goto end;
2495 }
2496
3bd1e081 2497 /*
50f8ae69 2498 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2499 * beginning of the loop to update the array. We want to prioritize
2500 * array update over low-priority reads.
3bd1e081 2501 */
509bb1cf 2502 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
ab30f567 2503 ssize_t pipe_readlen;
04fdd819 2504
50f8ae69 2505 DBG("consumer_data_pipe wake up");
acdb9057
DG
2506 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2507 &new_stream, sizeof(new_stream));
23f5f35d 2508 if (pipe_readlen < 0) {
6a00837f 2509 ERR("Consumer data pipe ret %zd", pipe_readlen);
23f5f35d
DG
2510 /* Continue so we can at least handle the current stream(s). */
2511 continue;
2512 }
c869f647
DG
2513
2514 /*
2515 * If the stream is NULL, just ignore it. It's also possible that
2516 * the sessiond poll thread changed the consumer_quit state and is
2517 * waking us up to test it.
2518 */
2519 if (new_stream == NULL) {
8994307f 2520 validate_endpoint_status_data_stream();
c869f647
DG
2521 continue;
2522 }
2523
c869f647 2524 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2525 continue;
2526 }
2527
2528 /* Take care of high priority channels first. */
2529 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2530 health_code_update();
2531
9617607b
DG
2532 if (local_stream[i] == NULL) {
2533 continue;
2534 }
fb3a43a9 2535 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2536 DBG("Urgent read on fd %d", pollfd[i].fd);
2537 high_prio = 1;
4078b776 2538 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 2539 /* it's ok to have an unavailable sub-buffer */
b64403e3 2540 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2541 /* Clean the stream and free it. */
2542 consumer_del_stream(local_stream[i], data_ht);
9617607b 2543 local_stream[i] = NULL;
4078b776
MD
2544 } else if (len > 0) {
2545 local_stream[i]->data_read = 1;
d41f73b7 2546 }
3bd1e081
MD
2547 }
2548 }
2549
4078b776
MD
2550 /*
2551 * If we read high prio channel in this loop, try again
2552 * for more high prio data.
2553 */
2554 if (high_prio) {
3bd1e081
MD
2555 continue;
2556 }
2557
2558 /* Take care of low priority channels. */
4078b776 2559 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2560 health_code_update();
2561
9617607b
DG
2562 if (local_stream[i] == NULL) {
2563 continue;
2564 }
4078b776
MD
2565 if ((pollfd[i].revents & POLLIN) ||
2566 local_stream[i]->hangup_flush_done) {
4078b776
MD
2567 DBG("Normal read on fd %d", pollfd[i].fd);
2568 len = ctx->on_buffer_ready(local_stream[i], ctx);
2569 /* it's ok to have an unavailable sub-buffer */
b64403e3 2570 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2571 /* Clean the stream and free it. */
2572 consumer_del_stream(local_stream[i], data_ht);
9617607b 2573 local_stream[i] = NULL;
4078b776
MD
2574 } else if (len > 0) {
2575 local_stream[i]->data_read = 1;
2576 }
2577 }
2578 }
2579
2580 /* Handle hangup and errors */
2581 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2582 health_code_update();
2583
9617607b
DG
2584 if (local_stream[i] == NULL) {
2585 continue;
2586 }
4078b776
MD
2587 if (!local_stream[i]->hangup_flush_done
2588 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2589 && (consumer_data.type == LTTNG_CONSUMER32_UST
2590 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2591 DBG("fd %d is hup|err|nval. Attempting flush and read.",
9617607b 2592 pollfd[i].fd);
4078b776
MD
2593 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2594 /* Attempt read again, for the data we just flushed. */
2595 local_stream[i]->data_read = 1;
2596 }
2597 /*
2598 * If the poll flag is HUP/ERR/NVAL and we have
2599 * read no data in this pass, we can remove the
2600 * stream from its hash table.
2601 */
2602 if ((pollfd[i].revents & POLLHUP)) {
2603 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2604 if (!local_stream[i]->data_read) {
43c34bc3 2605 consumer_del_stream(local_stream[i], data_ht);
9617607b 2606 local_stream[i] = NULL;
4078b776
MD
2607 num_hup++;
2608 }
2609 } else if (pollfd[i].revents & POLLERR) {
2610 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2611 if (!local_stream[i]->data_read) {
43c34bc3 2612 consumer_del_stream(local_stream[i], data_ht);
9617607b 2613 local_stream[i] = NULL;
4078b776
MD
2614 num_hup++;
2615 }
2616 } else if (pollfd[i].revents & POLLNVAL) {
2617 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2618 if (!local_stream[i]->data_read) {
43c34bc3 2619 consumer_del_stream(local_stream[i], data_ht);
9617607b 2620 local_stream[i] = NULL;
4078b776 2621 num_hup++;
3bd1e081
MD
2622 }
2623 }
9617607b
DG
2624 if (local_stream[i] != NULL) {
2625 local_stream[i]->data_read = 0;
2626 }
3bd1e081
MD
2627 }
2628 }
1fc79fb4
MD
2629 /* All is OK */
2630 err = 0;
3bd1e081
MD
2631end:
2632 DBG("polling thread exiting");
0e428499
DG
2633 free(pollfd);
2634 free(local_stream);
fb3a43a9
DG
2635
2636 /*
2637 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2638 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2639 * read side of the pipe. If we close them both, epoll_wait strangely does
2640 * not return and could create a endless wait period if the pipe is the
2641 * only tracked fd in the poll set. The thread will take care of closing
2642 * the read side.
fb3a43a9 2643 */
13886d2d 2644 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
fb3a43a9 2645
04bb2b64 2646 destroy_data_stream_ht(data_ht);
43c34bc3 2647
1fc79fb4
MD
2648 if (err) {
2649 health_error();
2650 ERR("Health error occurred in %s", __func__);
2651 }
2652 health_unregister(health_consumerd);
2653
e7b994a3 2654 rcu_unregister_thread();
3bd1e081
MD
2655 return NULL;
2656}
2657
d8ef542d
MD
2658/*
2659 * Close wake-up end of each stream belonging to the channel. This will
2660 * allow the poll() on the stream read-side to detect when the
2661 * write-side (application) finally closes them.
2662 */
2663static
2664void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2665{
2666 struct lttng_ht *ht;
2667 struct lttng_consumer_stream *stream;
2668 struct lttng_ht_iter iter;
2669
2670 ht = consumer_data.stream_per_chan_id_ht;
2671
2672 rcu_read_lock();
2673 cds_lfht_for_each_entry_duplicate(ht->ht,
2674 ht->hash_fct(&channel->key, lttng_ht_seed),
2675 ht->match_fct, &channel->key,
2676 &iter.iter, stream, node_channel_id.node) {
f2ad556d
MD
2677 /*
2678 * Protect against teardown with mutex.
2679 */
2680 pthread_mutex_lock(&stream->lock);
2681 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2682 goto next;
2683 }
d8ef542d
MD
2684 switch (consumer_data.type) {
2685 case LTTNG_CONSUMER_KERNEL:
2686 break;
2687 case LTTNG_CONSUMER32_UST:
2688 case LTTNG_CONSUMER64_UST:
2689 /*
2690 * Note: a mutex is taken internally within
2691 * liblttng-ust-ctl to protect timer wakeup_fd
2692 * use from concurrent close.
2693 */
2694 lttng_ustconsumer_close_stream_wakeup(stream);
2695 break;
2696 default:
2697 ERR("Unknown consumer_data type");
2698 assert(0);
2699 }
f2ad556d
MD
2700 next:
2701 pthread_mutex_unlock(&stream->lock);
d8ef542d
MD
2702 }
2703 rcu_read_unlock();
2704}
2705
2706static void destroy_channel_ht(struct lttng_ht *ht)
2707{
2708 struct lttng_ht_iter iter;
2709 struct lttng_consumer_channel *channel;
2710 int ret;
2711
2712 if (ht == NULL) {
2713 return;
2714 }
2715
2716 rcu_read_lock();
2717 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2718 ret = lttng_ht_del(ht, &iter);
2719 assert(ret != 0);
2720 }
2721 rcu_read_unlock();
2722
2723 lttng_ht_destroy(ht);
2724}
2725
2726/*
2727 * This thread polls the channel fds to detect when they are being
2728 * closed. It closes all related streams if the channel is detected as
2729 * closed. It is currently only used as a shim layer for UST because the
2730 * consumerd needs to keep the per-stream wakeup end of pipes open for
2731 * periodical flush.
2732 */
2733void *consumer_thread_channel_poll(void *data)
2734{
1fc79fb4 2735 int ret, i, pollfd, err = -1;
d8ef542d
MD
2736 uint32_t revents, nb_fd;
2737 struct lttng_consumer_channel *chan = NULL;
2738 struct lttng_ht_iter iter;
2739 struct lttng_ht_node_u64 *node;
2740 struct lttng_poll_event events;
2741 struct lttng_consumer_local_data *ctx = data;
2742 struct lttng_ht *channel_ht;
2743
2744 rcu_register_thread();
2745
1fc79fb4
MD
2746 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2747
9ce5646a
MD
2748 health_code_update();
2749
d8ef542d
MD
2750 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2751 if (!channel_ht) {
2752 /* ENOMEM at this point. Better to bail out. */
2753 goto end_ht;
2754 }
2755
2756 DBG("Thread channel poll started");
2757
2758 /* Size is set to 1 for the consumer_channel pipe */
2759 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2760 if (ret < 0) {
2761 ERR("Poll set creation failed");
2762 goto end_poll;
2763 }
2764
2765 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2766 if (ret < 0) {
2767 goto end;
2768 }
2769
2770 /* Main loop */
2771 DBG("Channel main loop started");
2772
2773 while (1) {
9ce5646a
MD
2774 health_code_update();
2775
d8ef542d
MD
2776 /* Only the channel pipe is set */
2777 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
1fc79fb4 2778 err = 0; /* All is OK */
d8ef542d
MD
2779 goto end;
2780 }
2781
2782restart:
2783 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
9ce5646a 2784 health_poll_entry();
d8ef542d 2785 ret = lttng_poll_wait(&events, -1);
9ce5646a 2786 health_poll_exit();
d8ef542d
MD
2787 DBG("Channel event catched in thread");
2788 if (ret < 0) {
2789 if (errno == EINTR) {
2790 ERR("Poll EINTR catched");
2791 goto restart;
2792 }
2793 goto end;
2794 }
2795
2796 nb_fd = ret;
2797
2798 /* From here, the event is a channel wait fd */
2799 for (i = 0; i < nb_fd; i++) {
9ce5646a
MD
2800 health_code_update();
2801
d8ef542d
MD
2802 revents = LTTNG_POLL_GETEV(&events, i);
2803 pollfd = LTTNG_POLL_GETFD(&events, i);
2804
2805 /* Just don't waste time if no returned events for the fd */
2806 if (!revents) {
2807 continue;
2808 }
2809 if (pollfd == ctx->consumer_channel_pipe[0]) {
2810 if (revents & (LPOLLERR | LPOLLHUP)) {
2811 DBG("Channel thread pipe hung up");
2812 /*
2813 * Remove the pipe from the poll set and continue the loop
2814 * since their might be data to consume.
2815 */
2816 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2817 continue;
2818 } else if (revents & LPOLLIN) {
2819 enum consumer_channel_action action;
a0cbdd2e 2820 uint64_t key;
d8ef542d 2821
a0cbdd2e 2822 ret = read_channel_pipe(ctx, &chan, &key, &action);
d8ef542d
MD
2823 if (ret <= 0) {
2824 ERR("Error reading channel pipe");
2825 continue;
2826 }
2827
2828 switch (action) {
2829 case CONSUMER_CHANNEL_ADD:
2830 DBG("Adding channel %d to poll set",
2831 chan->wait_fd);
2832
2833 lttng_ht_node_init_u64(&chan->wait_fd_node,
2834 chan->wait_fd);
c7260a81 2835 rcu_read_lock();
d8ef542d
MD
2836 lttng_ht_add_unique_u64(channel_ht,
2837 &chan->wait_fd_node);
c7260a81 2838 rcu_read_unlock();
d8ef542d
MD
2839 /* Add channel to the global poll events list */
2840 lttng_poll_add(&events, chan->wait_fd,
2841 LPOLLIN | LPOLLPRI);
2842 break;
a0cbdd2e
MD
2843 case CONSUMER_CHANNEL_DEL:
2844 {
f2a444f1
DG
2845 struct lttng_consumer_stream *stream, *stmp;
2846
c7260a81 2847 rcu_read_lock();
a0cbdd2e
MD
2848 chan = consumer_find_channel(key);
2849 if (!chan) {
c7260a81 2850 rcu_read_unlock();
a0cbdd2e
MD
2851 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
2852 break;
2853 }
2854 lttng_poll_del(&events, chan->wait_fd);
f623cc0b 2855 iter.iter.node = &chan->wait_fd_node.node;
a0cbdd2e
MD
2856 ret = lttng_ht_del(channel_ht, &iter);
2857 assert(ret == 0);
2858 consumer_close_channel_streams(chan);
2859
f2a444f1
DG
2860 switch (consumer_data.type) {
2861 case LTTNG_CONSUMER_KERNEL:
2862 break;
2863 case LTTNG_CONSUMER32_UST:
2864 case LTTNG_CONSUMER64_UST:
2865 /* Delete streams that might have been left in the stream list. */
2866 cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
2867 send_node) {
9ce5646a
MD
2868 health_code_update();
2869
f2a444f1
DG
2870 cds_list_del(&stream->send_node);
2871 lttng_ustconsumer_del_stream(stream);
2872 uatomic_sub(&stream->chan->refcount, 1);
2873 assert(&chan->refcount);
2874 free(stream);
2875 }
2876 break;
2877 default:
2878 ERR("Unknown consumer_data type");
2879 assert(0);
2880 }
2881
a0cbdd2e
MD
2882 /*
2883 * Release our own refcount. Force channel deletion even if
2884 * streams were not initialized.
2885 */
2886 if (!uatomic_sub_return(&chan->refcount, 1)) {
2887 consumer_del_channel(chan);
2888 }
c7260a81 2889 rcu_read_unlock();
a0cbdd2e
MD
2890 goto restart;
2891 }
d8ef542d
MD
2892 case CONSUMER_CHANNEL_QUIT:
2893 /*
2894 * Remove the pipe from the poll set and continue the loop
2895 * since their might be data to consume.
2896 */
2897 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2898 continue;
2899 default:
2900 ERR("Unknown action");
2901 break;
2902 }
2903 }
2904
2905 /* Handle other stream */
2906 continue;
2907 }
2908
2909 rcu_read_lock();
2910 {
2911 uint64_t tmp_id = (uint64_t) pollfd;
2912
2913 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
2914 }
2915 node = lttng_ht_iter_get_node_u64(&iter);
2916 assert(node);
2917
2918 chan = caa_container_of(node, struct lttng_consumer_channel,
2919 wait_fd_node);
2920
2921 /* Check for error event */
2922 if (revents & (LPOLLERR | LPOLLHUP)) {
2923 DBG("Channel fd %d is hup|err.", pollfd);
2924
2925 lttng_poll_del(&events, chan->wait_fd);
2926 ret = lttng_ht_del(channel_ht, &iter);
2927 assert(ret == 0);
2928 consumer_close_channel_streams(chan);
f2ad556d
MD
2929
2930 /* Release our own refcount */
2931 if (!uatomic_sub_return(&chan->refcount, 1)
2932 && !uatomic_read(&chan->nb_init_stream_left)) {
2933 consumer_del_channel(chan);
2934 }
d8ef542d
MD
2935 }
2936
2937 /* Release RCU lock for the channel looked up */
2938 rcu_read_unlock();
2939 }
2940 }
2941
1fc79fb4
MD
2942 /* All is OK */
2943 err = 0;
d8ef542d
MD
2944end:
2945 lttng_poll_clean(&events);
2946end_poll:
2947 destroy_channel_ht(channel_ht);
2948end_ht:
2949 DBG("Channel poll thread exiting");
1fc79fb4
MD
2950 if (err) {
2951 health_error();
2952 ERR("Health error occurred in %s", __func__);
2953 }
2954 health_unregister(health_consumerd);
d8ef542d
MD
2955 rcu_unregister_thread();
2956 return NULL;
2957}
2958
331744e3
JD
2959static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
2960 struct pollfd *sockpoll, int client_socket)
2961{
2962 int ret;
2963
2964 assert(ctx);
2965 assert(sockpoll);
2966
2967 if (lttng_consumer_poll_socket(sockpoll) < 0) {
2968 ret = -1;
2969 goto error;
2970 }
2971 DBG("Metadata connection on client_socket");
2972
2973 /* Blocking call, waiting for transmission */
2974 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
2975 if (ctx->consumer_metadata_socket < 0) {
2976 WARN("On accept metadata");
2977 ret = -1;
2978 goto error;
2979 }
2980 ret = 0;
2981
2982error:
2983 return ret;
2984}
2985
3bd1e081
MD
2986/*
2987 * This thread listens on the consumerd socket and receives the file
2988 * descriptors from the session daemon.
2989 */
7d980def 2990void *consumer_thread_sessiond_poll(void *data)
3bd1e081 2991{
1fc79fb4 2992 int sock = -1, client_socket, ret, err = -1;
3bd1e081
MD
2993 /*
2994 * structure to poll for incoming data on communication socket avoids
2995 * making blocking sockets.
2996 */
2997 struct pollfd consumer_sockpoll[2];
2998 struct lttng_consumer_local_data *ctx = data;
2999
e7b994a3
DG
3000 rcu_register_thread();
3001
1fc79fb4
MD
3002 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3003
9ce5646a
MD
3004 health_code_update();
3005
3bd1e081
MD
3006 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3007 unlink(ctx->consumer_command_sock_path);
3008 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3009 if (client_socket < 0) {
3010 ERR("Cannot create command socket");
3011 goto end;
3012 }
3013
3014 ret = lttcomm_listen_unix_sock(client_socket);
3015 if (ret < 0) {
3016 goto end;
3017 }
3018
32258573 3019 DBG("Sending ready command to lttng-sessiond");
f73fabfd 3020 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
3021 /* return < 0 on error, but == 0 is not fatal */
3022 if (ret < 0) {
32258573 3023 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
3024 goto end;
3025 }
3026
3bd1e081
MD
3027 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3028 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3029 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3030 consumer_sockpoll[1].fd = client_socket;
3031 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3032
3033 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3034 goto end;
3035 }
3036 DBG("Connection on client_socket");
3037
3038 /* Blocking call, waiting for transmission */
3039 sock = lttcomm_accept_unix_sock(client_socket);
534d2592 3040 if (sock < 0) {
3bd1e081
MD
3041 WARN("On accept");
3042 goto end;
3043 }
3bd1e081 3044
331744e3
JD
3045 /*
3046 * Setup metadata socket which is the second socket connection on the
3047 * command unix socket.
3048 */
3049 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3050 if (ret < 0) {
3051 goto end;
3052 }
3053
d96f09c6
DG
3054 /* This socket is not useful anymore. */
3055 ret = close(client_socket);
3056 if (ret < 0) {
3057 PERROR("close client_socket");
3058 }
3059 client_socket = -1;
3060
3bd1e081
MD
3061 /* update the polling structure to poll on the established socket */
3062 consumer_sockpoll[1].fd = sock;
3063 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3064
3065 while (1) {
9ce5646a
MD
3066 health_code_update();
3067
3068 health_poll_entry();
3069 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3070 health_poll_exit();
3071 if (ret < 0) {
3bd1e081
MD
3072 goto end;
3073 }
3074 DBG("Incoming command on sock");
3075 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3076 if (ret == -ENOENT) {
3077 DBG("Received STOP command");
3078 goto end;
3079 }
4cbc1a04
DG
3080 if (ret <= 0) {
3081 /*
3082 * This could simply be a session daemon quitting. Don't output
3083 * ERR() here.
3084 */
3085 DBG("Communication interrupted on command socket");
41ba6035 3086 err = 0;
3bd1e081
MD
3087 goto end;
3088 }
3089 if (consumer_quit) {
3090 DBG("consumer_thread_receive_fds received quit from signal");
1fc79fb4 3091 err = 0; /* All is OK */
3bd1e081
MD
3092 goto end;
3093 }
ffe60014 3094 DBG("received command on sock");
3bd1e081 3095 }
1fc79fb4
MD
3096 /* All is OK */
3097 err = 0;
3098
3bd1e081 3099end:
ffe60014 3100 DBG("Consumer thread sessiond poll exiting");
3bd1e081 3101
d88aee68
DG
3102 /*
3103 * Close metadata streams since the producer is the session daemon which
3104 * just died.
3105 *
3106 * NOTE: for now, this only applies to the UST tracer.
3107 */
3108 lttng_consumer_close_metadata();
3109
3bd1e081
MD
3110 /*
3111 * when all fds have hung up, the polling thread
3112 * can exit cleanly
3113 */
3114 consumer_quit = 1;
3115
04fdd819 3116 /*
c869f647 3117 * Notify the data poll thread to poll back again and test the
8994307f 3118 * consumer_quit state that we just set so to quit gracefully.
04fdd819 3119 */
acdb9057 3120 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
c869f647 3121
a0cbdd2e 3122 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
d8ef542d 3123
d96f09c6
DG
3124 /* Cleaning up possibly open sockets. */
3125 if (sock >= 0) {
3126 ret = close(sock);
3127 if (ret < 0) {
3128 PERROR("close sock sessiond poll");
3129 }
3130 }
3131 if (client_socket >= 0) {
38476d24 3132 ret = close(client_socket);
d96f09c6
DG
3133 if (ret < 0) {
3134 PERROR("close client_socket sessiond poll");
3135 }
3136 }
3137
1fc79fb4
MD
3138 if (err) {
3139 health_error();
3140 ERR("Health error occurred in %s", __func__);
3141 }
3142 health_unregister(health_consumerd);
3143
e7b994a3 3144 rcu_unregister_thread();
3bd1e081
MD
3145 return NULL;
3146}
d41f73b7 3147
4078b776 3148ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
3149 struct lttng_consumer_local_data *ctx)
3150{
74251bb8
DG
3151 ssize_t ret;
3152
3153 pthread_mutex_lock(&stream->lock);
94d49140
JD
3154 if (stream->metadata_flag) {
3155 pthread_mutex_lock(&stream->metadata_rdv_lock);
3156 }
74251bb8 3157
d41f73b7
MD
3158 switch (consumer_data.type) {
3159 case LTTNG_CONSUMER_KERNEL:
74251bb8
DG
3160 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
3161 break;
7753dea8
MD
3162 case LTTNG_CONSUMER32_UST:
3163 case LTTNG_CONSUMER64_UST:
74251bb8
DG
3164 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
3165 break;
d41f73b7
MD
3166 default:
3167 ERR("Unknown consumer_data type");
3168 assert(0);
74251bb8
DG
3169 ret = -ENOSYS;
3170 break;
d41f73b7 3171 }
74251bb8 3172
94d49140
JD
3173 if (stream->metadata_flag) {
3174 pthread_cond_broadcast(&stream->metadata_rdv);
3175 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3176 }
74251bb8
DG
3177 pthread_mutex_unlock(&stream->lock);
3178 return ret;
d41f73b7
MD
3179}
3180
3181int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3182{
3183 switch (consumer_data.type) {
3184 case LTTNG_CONSUMER_KERNEL:
3185 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
3186 case LTTNG_CONSUMER32_UST:
3187 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
3188 return lttng_ustconsumer_on_recv_stream(stream);
3189 default:
3190 ERR("Unknown consumer_data type");
3191 assert(0);
3192 return -ENOSYS;
3193 }
3194}
e4421fec
DG
3195
3196/*
3197 * Allocate and set consumer data hash tables.
3198 */
3199void lttng_consumer_init(void)
3200{
d88aee68
DG
3201 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3202 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3203 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
d8ef542d 3204 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
e4421fec 3205}
7735ef9e
DG
3206
3207/*
3208 * Process the ADD_RELAYD command receive by a consumer.
3209 *
3210 * This will create a relayd socket pair and add it to the relayd hash table.
3211 * The caller MUST acquire a RCU read side lock before calling it.
3212 */
da009f2c 3213int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
7735ef9e 3214 struct lttng_consumer_local_data *ctx, int sock,
6151a90f 3215 struct pollfd *consumer_sockpoll,
d3e2ba59
JD
3216 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3217 uint64_t relayd_session_id)
7735ef9e 3218{
cd2b09ed 3219 int fd = -1, ret = -1, relayd_created = 0;
f50f23d9 3220 enum lttng_error_code ret_code = LTTNG_OK;
d4298c99 3221 struct consumer_relayd_sock_pair *relayd = NULL;
7735ef9e 3222
6151a90f
JD
3223 assert(ctx);
3224 assert(relayd_sock);
3225
da009f2c 3226 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
7735ef9e
DG
3227
3228 /* Get relayd reference if exists. */
3229 relayd = consumer_find_relayd(net_seq_idx);
3230 if (relayd == NULL) {
da009f2c 3231 assert(sock_type == LTTNG_STREAM_CONTROL);
7735ef9e
DG
3232 /* Not found. Allocate one. */
3233 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3234 if (relayd == NULL) {
0d08d75e 3235 ret = -ENOMEM;
618a6a28
MD
3236 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3237 goto error;
0d08d75e 3238 } else {
30319bcb 3239 relayd->sessiond_session_id = sessiond_id;
0d08d75e 3240 relayd_created = 1;
7735ef9e 3241 }
0d08d75e
DG
3242
3243 /*
3244 * This code path MUST continue to the consumer send status message to
3245 * we can notify the session daemon and continue our work without
3246 * killing everything.
3247 */
da009f2c
MD
3248 } else {
3249 /*
3250 * relayd key should never be found for control socket.
3251 */
3252 assert(sock_type != LTTNG_STREAM_CONTROL);
0d08d75e
DG
3253 }
3254
3255 /* First send a status message before receiving the fds. */
618a6a28
MD
3256 ret = consumer_send_status_msg(sock, LTTNG_OK);
3257 if (ret < 0) {
0d08d75e 3258 /* Somehow, the session daemon is not responding anymore. */
618a6a28
MD
3259 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3260 goto error_nosignal;
7735ef9e
DG
3261 }
3262
3263 /* Poll on consumer socket. */
3264 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
0d08d75e 3265 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
7735ef9e 3266 ret = -EINTR;
618a6a28 3267 goto error_nosignal;
7735ef9e
DG
3268 }
3269
3270 /* Get relayd socket from session daemon */
3271 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3272 if (ret != sizeof(fd)) {
7735ef9e 3273 ret = -1;
4028eeb9 3274 fd = -1; /* Just in case it gets set with an invalid value. */
0d08d75e
DG
3275
3276 /*
3277 * Failing to receive FDs might indicate a major problem such as
3278 * reaching a fd limit during the receive where the kernel returns a
3279 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3280 * don't take any chances and stop everything.
3281 *
3282 * XXX: Feature request #558 will fix that and avoid this possible
3283 * issue when reaching the fd limit.
3284 */
3285 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
618a6a28 3286 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
f50f23d9
DG
3287 goto error;
3288 }
3289
7735ef9e
DG
3290 /* Copy socket information and received FD */
3291 switch (sock_type) {
3292 case LTTNG_STREAM_CONTROL:
3293 /* Copy received lttcomm socket */
6151a90f
JD
3294 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3295 ret = lttcomm_create_sock(&relayd->control_sock.sock);
4028eeb9 3296 /* Handle create_sock error. */
f66c074c 3297 if (ret < 0) {
618a6a28 3298 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3299 goto error;
f66c074c 3300 }
da009f2c
MD
3301 /*
3302 * Close the socket created internally by
3303 * lttcomm_create_sock, so we can replace it by the one
3304 * received from sessiond.
3305 */
3306 if (close(relayd->control_sock.sock.fd)) {
3307 PERROR("close");
3308 }
7735ef9e
DG
3309
3310 /* Assign new file descriptor */
6151a90f 3311 relayd->control_sock.sock.fd = fd;
4b29f1ce 3312 fd = -1; /* For error path */
6151a90f
JD
3313 /* Assign version values. */
3314 relayd->control_sock.major = relayd_sock->major;
3315 relayd->control_sock.minor = relayd_sock->minor;
c5b6f4f0 3316
d3e2ba59 3317 relayd->relayd_session_id = relayd_session_id;
c5b6f4f0 3318
7735ef9e
DG
3319 break;
3320 case LTTNG_STREAM_DATA:
3321 /* Copy received lttcomm socket */
6151a90f
JD
3322 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3323 ret = lttcomm_create_sock(&relayd->data_sock.sock);
4028eeb9 3324 /* Handle create_sock error. */
f66c074c 3325 if (ret < 0) {
618a6a28 3326 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
4028eeb9 3327 goto error;
f66c074c 3328 }
da009f2c
MD
3329 /*
3330 * Close the socket created internally by
3331 * lttcomm_create_sock, so we can replace it by the one
3332 * received from sessiond.
3333 */
3334 if (close(relayd->data_sock.sock.fd)) {
3335 PERROR("close");
3336 }
7735ef9e
DG
3337
3338 /* Assign new file descriptor */
6151a90f 3339 relayd->data_sock.sock.fd = fd;
4b29f1ce 3340 fd = -1; /* for eventual error paths */
6151a90f
JD
3341 /* Assign version values. */
3342 relayd->data_sock.major = relayd_sock->major;
3343 relayd->data_sock.minor = relayd_sock->minor;
7735ef9e
DG
3344 break;
3345 default:
3346 ERR("Unknown relayd socket type (%d)", sock_type);
59e71485 3347 ret = -1;
618a6a28 3348 ret_code = LTTCOMM_CONSUMERD_FATAL;
7735ef9e
DG
3349 goto error;
3350 }
3351
d88aee68 3352 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
7735ef9e
DG
3353 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3354 relayd->net_seq_idx, fd);
3355
618a6a28
MD
3356 /* We successfully added the socket. Send status back. */
3357 ret = consumer_send_status_msg(sock, ret_code);
3358 if (ret < 0) {
3359 /* Somehow, the session daemon is not responding anymore. */
3360 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3361 goto error_nosignal;
3362 }
3363
7735ef9e
DG
3364 /*
3365 * Add relayd socket pair to consumer data hashtable. If object already
3366 * exists or on error, the function gracefully returns.
3367 */
d09e1200 3368 add_relayd(relayd);
7735ef9e
DG
3369
3370 /* All good! */
4028eeb9 3371 return 0;
7735ef9e
DG
3372
3373error:
618a6a28
MD
3374 if (consumer_send_status_msg(sock, ret_code) < 0) {
3375 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3376 }
3377
3378error_nosignal:
4028eeb9
DG
3379 /* Close received socket if valid. */
3380 if (fd >= 0) {
3381 if (close(fd)) {
3382 PERROR("close received socket");
3383 }
3384 }
cd2b09ed
DG
3385
3386 if (relayd_created) {
cd2b09ed
DG
3387 free(relayd);
3388 }
3389
7735ef9e
DG
3390 return ret;
3391}
ca22feea 3392
4e9a4686
DG
3393/*
3394 * Try to lock the stream mutex.
3395 *
3396 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3397 */
3398static int stream_try_lock(struct lttng_consumer_stream *stream)
3399{
3400 int ret;
3401
3402 assert(stream);
3403
3404 /*
3405 * Try to lock the stream mutex. On failure, we know that the stream is
3406 * being used else where hence there is data still being extracted.
3407 */
3408 ret = pthread_mutex_trylock(&stream->lock);
3409 if (ret) {
3410 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3411 ret = 0;
3412 goto end;
3413 }
3414
3415 ret = 1;
3416
3417end:
3418 return ret;
3419}
3420
f7079f67
DG
3421/*
3422 * Search for a relayd associated to the session id and return the reference.
3423 *
3424 * A rcu read side lock MUST be acquire before calling this function and locked
3425 * until the relayd object is no longer necessary.
3426 */
3427static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3428{
3429 struct lttng_ht_iter iter;
f7079f67 3430 struct consumer_relayd_sock_pair *relayd = NULL;
f7079f67
DG
3431
3432 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3433 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3434 node.node) {
18261bd1
DG
3435 /*
3436 * Check by sessiond id which is unique here where the relayd session
3437 * id might not be when having multiple relayd.
3438 */
3439 if (relayd->sessiond_session_id == id) {
f7079f67 3440 /* Found the relayd. There can be only one per id. */
18261bd1 3441 goto found;
f7079f67
DG
3442 }
3443 }
3444
18261bd1
DG
3445 return NULL;
3446
3447found:
f7079f67
DG
3448 return relayd;
3449}
3450
ca22feea
DG
3451/*
3452 * Check if for a given session id there is still data needed to be extract
3453 * from the buffers.
3454 *
6d805429 3455 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 3456 */
6d805429 3457int consumer_data_pending(uint64_t id)
ca22feea
DG
3458{
3459 int ret;
3460 struct lttng_ht_iter iter;
3461 struct lttng_ht *ht;
3462 struct lttng_consumer_stream *stream;
f7079f67 3463 struct consumer_relayd_sock_pair *relayd = NULL;
6d805429 3464 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 3465
6d805429 3466 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 3467
6f6eda74 3468 rcu_read_lock();
ca22feea
DG
3469 pthread_mutex_lock(&consumer_data.lock);
3470
3471 switch (consumer_data.type) {
3472 case LTTNG_CONSUMER_KERNEL:
6d805429 3473 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
3474 break;
3475 case LTTNG_CONSUMER32_UST:
3476 case LTTNG_CONSUMER64_UST:
6d805429 3477 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
3478 break;
3479 default:
3480 ERR("Unknown consumer data type");
3481 assert(0);
3482 }
3483
3484 /* Ease our life a bit */
3485 ht = consumer_data.stream_list_ht;
3486
f7079f67
DG
3487 relayd = find_relayd_by_session_id(id);
3488 if (relayd) {
3489 /* Send init command for data pending. */
3490 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3491 ret = relayd_begin_data_pending(&relayd->control_sock,
3492 relayd->relayd_session_id);
3493 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3494 if (ret < 0) {
3495 /* Communication error thus the relayd so no data pending. */
3496 goto data_not_pending;
3497 }
3498 }
3499
c8f59ee5 3500 cds_lfht_for_each_entry_duplicate(ht->ht,
d88aee68
DG
3501 ht->hash_fct(&id, lttng_ht_seed),
3502 ht->match_fct, &id,
ca22feea 3503 &iter.iter, stream, node_session_id.node) {
4e9a4686
DG
3504 /* If this call fails, the stream is being used hence data pending. */
3505 ret = stream_try_lock(stream);
3506 if (!ret) {
f7079f67 3507 goto data_pending;
ca22feea 3508 }
ca22feea 3509
4e9a4686
DG
3510 /*
3511 * A removed node from the hash table indicates that the stream has
3512 * been deleted thus having a guarantee that the buffers are closed
3513 * on the consumer side. However, data can still be transmitted
3514 * over the network so don't skip the relayd check.
3515 */
3516 ret = cds_lfht_is_node_deleted(&stream->node.node);
3517 if (!ret) {
e5d1a9b3
MD
3518 /*
3519 * An empty output file is not valid. We need at least one packet
3520 * generated per stream, even if it contains no event, so it
3521 * contains at least one packet header.
3522 */
3523 if (stream->output_written == 0) {
3524 pthread_mutex_unlock(&stream->lock);
3525 goto data_pending;
3526 }
4e9a4686 3527 /* Check the stream if there is data in the buffers. */
6d805429
DG
3528 ret = data_pending(stream);
3529 if (ret == 1) {
4e9a4686 3530 pthread_mutex_unlock(&stream->lock);
f7079f67 3531 goto data_pending;
4e9a4686
DG
3532 }
3533 }
3534
3535 /* Relayd check */
f7079f67 3536 if (relayd) {
c8f59ee5
DG
3537 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3538 if (stream->metadata_flag) {
ad7051c0
DG
3539 ret = relayd_quiescent_control(&relayd->control_sock,
3540 stream->relayd_stream_id);
c8f59ee5 3541 } else {
6d805429 3542 ret = relayd_data_pending(&relayd->control_sock,
39df6d9f
DG
3543 stream->relayd_stream_id,
3544 stream->next_net_seq_num - 1);
c8f59ee5
DG
3545 }
3546 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
6d805429 3547 if (ret == 1) {
4e9a4686 3548 pthread_mutex_unlock(&stream->lock);
f7079f67 3549 goto data_pending;
c8f59ee5
DG
3550 }
3551 }
4e9a4686 3552 pthread_mutex_unlock(&stream->lock);
c8f59ee5 3553 }
ca22feea 3554
f7079f67
DG
3555 if (relayd) {
3556 unsigned int is_data_inflight = 0;
3557
3558 /* Send init command for data pending. */
3559 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3560 ret = relayd_end_data_pending(&relayd->control_sock,
3561 relayd->relayd_session_id, &is_data_inflight);
3562 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
bdd88757 3563 if (ret < 0) {
f7079f67
DG
3564 goto data_not_pending;
3565 }
bdd88757
DG
3566 if (is_data_inflight) {
3567 goto data_pending;
3568 }
f7079f67
DG
3569 }
3570
ca22feea 3571 /*
f7079f67
DG
3572 * Finding _no_ node in the hash table and no inflight data means that the
3573 * stream(s) have been removed thus data is guaranteed to be available for
3574 * analysis from the trace files.
ca22feea
DG
3575 */
3576
f7079f67 3577data_not_pending:
ca22feea
DG
3578 /* Data is available to be read by a viewer. */
3579 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3580 rcu_read_unlock();
6d805429 3581 return 0;
ca22feea 3582
f7079f67 3583data_pending:
ca22feea
DG
3584 /* Data is still being extracted from buffers. */
3585 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 3586 rcu_read_unlock();
6d805429 3587 return 1;
ca22feea 3588}
f50f23d9
DG
3589
3590/*
3591 * Send a ret code status message to the sessiond daemon.
3592 *
3593 * Return the sendmsg() return value.
3594 */
3595int consumer_send_status_msg(int sock, int ret_code)
3596{
3597 struct lttcomm_consumer_status_msg msg;
3598
3599 msg.ret_code = ret_code;
3600
3601 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3602}
ffe60014
DG
3603
3604/*
3605 * Send a channel status message to the sessiond daemon.
3606 *
3607 * Return the sendmsg() return value.
3608 */
3609int consumer_send_status_channel(int sock,
3610 struct lttng_consumer_channel *channel)
3611{
3612 struct lttcomm_consumer_status_channel msg;
3613
3614 assert(sock >= 0);
3615
3616 if (!channel) {
3617 msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
3618 } else {
3619 msg.ret_code = LTTNG_OK;
3620 msg.key = channel->key;
3621 msg.stream_count = channel->streams.count;
3622 }
3623
3624 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3625}
5c786ded
JD
3626
3627/*
3628 * Using a maximum stream size with the produced and consumed position of a
3629 * stream, computes the new consumed position to be as close as possible to the
3630 * maximum possible stream size.
3631 *
3632 * If maximum stream size is lower than the possible buffer size (produced -
3633 * consumed), the consumed_pos given is returned untouched else the new value
3634 * is returned.
3635 */
3636unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
3637 unsigned long produced_pos, uint64_t max_stream_size)
3638{
3639 if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
3640 /* Offset from the produced position to get the latest buffers. */
3641 return produced_pos - max_stream_size;
3642 }
3643
3644 return consumed_pos;
3645}
This page took 0.242391 seconds and 4 git commands to generate.