Change the UST event hash table match function
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31
990570ed 32#include <common/common.h>
fb3a43a9
DG
33#include <common/utils.h>
34#include <common/compat/poll.h>
10a8a223 35#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 36#include <common/sessiond-comm/relayd.h>
10a8a223
DG
37#include <common/sessiond-comm/sessiond-comm.h>
38#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 39#include <common/relayd/relayd.h>
10a8a223
DG
40#include <common/ust-consumer/ust-consumer.h>
41
42#include "consumer.h"
3bd1e081
MD
43
44struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
45 .stream_count = 0,
46 .need_update = 1,
47 .type = LTTNG_CONSUMER_UNKNOWN,
48};
49
3bd1e081
MD
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
a98dae5f 56volatile int consumer_quit;
3bd1e081 57
43c34bc3
DG
58/*
59 * The following two hash tables are visible by all threads which are separated
60 * in different source files.
61 *
62 * Global hash table containing respectively metadata and data streams. The
63 * stream element in this ht should only be updated by the metadata poll thread
64 * for the metadata and the data poll thread for the data.
65 */
a98dae5f
DG
66struct lttng_ht *metadata_ht;
67struct lttng_ht *data_ht;
43c34bc3 68
8994307f
DG
69/*
70 * Notify a thread pipe to poll back again. This usually means that some global
71 * state has changed so we just send back the thread in a poll wait call.
72 */
73static void notify_thread_pipe(int wpipe)
74{
75 int ret;
76
77 do {
78 struct lttng_consumer_stream *null_stream = NULL;
79
80 ret = write(wpipe, &null_stream, sizeof(null_stream));
81 } while (ret < 0 && errno == EINTR);
82}
83
3bd1e081
MD
84/*
85 * Find a stream. The consumer_data.lock must be locked during this
86 * call.
87 */
8389e4f8
DG
88static struct lttng_consumer_stream *consumer_find_stream(int key,
89 struct lttng_ht *ht)
3bd1e081 90{
e4421fec
DG
91 struct lttng_ht_iter iter;
92 struct lttng_ht_node_ulong *node;
93 struct lttng_consumer_stream *stream = NULL;
3bd1e081 94
8389e4f8
DG
95 assert(ht);
96
7ad0a0cb 97 /* Negative keys are lookup failures */
7a57cf92 98 if (key < 0) {
7ad0a0cb 99 return NULL;
7a57cf92 100 }
e4421fec 101
6065ceec
DG
102 rcu_read_lock();
103
8389e4f8 104 lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
e4421fec
DG
105 node = lttng_ht_iter_get_node_ulong(&iter);
106 if (node != NULL) {
107 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 108 }
e4421fec 109
6065ceec
DG
110 rcu_read_unlock();
111
e4421fec 112 return stream;
3bd1e081
MD
113}
114
c869f647 115void consumer_steal_stream_key(int key, struct lttng_ht *ht)
7ad0a0cb
MD
116{
117 struct lttng_consumer_stream *stream;
118
04253271 119 rcu_read_lock();
8389e4f8 120 stream = consumer_find_stream(key, ht);
04253271 121 if (stream) {
7ad0a0cb 122 stream->key = -1;
04253271
MD
123 /*
124 * We don't want the lookup to match, but we still need
125 * to iterate on this stream when iterating over the hash table. Just
126 * change the node key.
127 */
128 stream->node.key = -1;
129 }
130 rcu_read_unlock();
7ad0a0cb
MD
131}
132
3bd1e081
MD
133static struct lttng_consumer_channel *consumer_find_channel(int key)
134{
e4421fec
DG
135 struct lttng_ht_iter iter;
136 struct lttng_ht_node_ulong *node;
137 struct lttng_consumer_channel *channel = NULL;
3bd1e081 138
7ad0a0cb 139 /* Negative keys are lookup failures */
7a57cf92 140 if (key < 0) {
7ad0a0cb 141 return NULL;
7a57cf92 142 }
e4421fec 143
6065ceec
DG
144 rcu_read_lock();
145
e4421fec
DG
146 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
147 &iter);
148 node = lttng_ht_iter_get_node_ulong(&iter);
149 if (node != NULL) {
150 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 151 }
e4421fec 152
6065ceec
DG
153 rcu_read_unlock();
154
e4421fec 155 return channel;
3bd1e081
MD
156}
157
7ad0a0cb
MD
158static void consumer_steal_channel_key(int key)
159{
160 struct lttng_consumer_channel *channel;
161
04253271 162 rcu_read_lock();
7ad0a0cb 163 channel = consumer_find_channel(key);
04253271 164 if (channel) {
7ad0a0cb 165 channel->key = -1;
04253271
MD
166 /*
167 * We don't want the lookup to match, but we still need
168 * to iterate on this channel when iterating over the hash table. Just
169 * change the node key.
170 */
171 channel->node.key = -1;
172 }
173 rcu_read_unlock();
7ad0a0cb
MD
174}
175
702b1ea4
MD
176static
177void consumer_free_stream(struct rcu_head *head)
178{
179 struct lttng_ht_node_ulong *node =
180 caa_container_of(head, struct lttng_ht_node_ulong, head);
181 struct lttng_consumer_stream *stream =
182 caa_container_of(node, struct lttng_consumer_stream, node);
183
184 free(stream);
185}
186
00e2e675
DG
187/*
188 * RCU protected relayd socket pair free.
189 */
190static void consumer_rcu_free_relayd(struct rcu_head *head)
191{
192 struct lttng_ht_node_ulong *node =
193 caa_container_of(head, struct lttng_ht_node_ulong, head);
194 struct consumer_relayd_sock_pair *relayd =
195 caa_container_of(node, struct consumer_relayd_sock_pair, node);
196
8994307f
DG
197 /*
198 * Close all sockets. This is done in the call RCU since we don't want the
199 * socket fds to be reassigned thus potentially creating bad state of the
200 * relayd object.
201 *
202 * We do not have to lock the control socket mutex here since at this stage
203 * there is no one referencing to this relayd object.
204 */
205 (void) relayd_close(&relayd->control_sock);
206 (void) relayd_close(&relayd->data_sock);
207
00e2e675
DG
208 free(relayd);
209}
210
211/*
212 * Destroy and free relayd socket pair object.
213 *
214 * This function MUST be called with the consumer_data lock acquired.
215 */
d09e1200 216static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
217{
218 int ret;
219 struct lttng_ht_iter iter;
220
173af62f
DG
221 if (relayd == NULL) {
222 return;
223 }
224
00e2e675
DG
225 DBG("Consumer destroy and close relayd socket pair");
226
227 iter.iter.node = &relayd->node.node;
228 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f 229 if (ret != 0) {
8994307f 230 /* We assume the relayd is being or is destroyed */
173af62f
DG
231 return;
232 }
00e2e675 233
00e2e675
DG
234 /* RCU free() call */
235 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
236}
237
8994307f
DG
238/*
239 * Update the end point status of all streams having the given network sequence
240 * index (relayd index).
241 *
242 * It's atomically set without having the stream mutex locked which is fine
243 * because we handle the write/read race with a pipe wakeup for each thread.
244 */
245static void update_endpoint_status_by_netidx(int net_seq_idx,
246 enum consumer_endpoint_status status)
247{
248 struct lttng_ht_iter iter;
249 struct lttng_consumer_stream *stream;
250
251 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
252
253 rcu_read_lock();
254
255 /* Let's begin with metadata */
256 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
257 if (stream->net_seq_idx == net_seq_idx) {
258 uatomic_set(&stream->endpoint_status, status);
259 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
260 }
261 }
262
263 /* Follow up by the data streams */
264 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
265 if (stream->net_seq_idx == net_seq_idx) {
266 uatomic_set(&stream->endpoint_status, status);
267 DBG("Delete flag set to data stream %d", stream->wait_fd);
268 }
269 }
270 rcu_read_unlock();
271}
272
273/*
274 * Cleanup a relayd object by flagging every associated streams for deletion,
275 * destroying the object meaning removing it from the relayd hash table,
276 * closing the sockets and freeing the memory in a RCU call.
277 *
278 * If a local data context is available, notify the threads that the streams'
279 * state have changed.
280 */
281static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
282 struct lttng_consumer_local_data *ctx)
283{
284 int netidx;
285
286 assert(relayd);
287
9617607b
DG
288 DBG("Cleaning up relayd sockets");
289
8994307f
DG
290 /* Save the net sequence index before destroying the object */
291 netidx = relayd->net_seq_idx;
292
293 /*
294 * Delete the relayd from the relayd hash table, close the sockets and free
295 * the object in a RCU call.
296 */
297 destroy_relayd(relayd);
298
299 /* Set inactive endpoint to all streams */
300 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
301
302 /*
303 * With a local data context, notify the threads that the streams' state
304 * have changed. The write() action on the pipe acts as an "implicit"
305 * memory barrier ordering the updates of the end point status from the
306 * read of this status which happens AFTER receiving this notify.
307 */
308 if (ctx) {
309 notify_thread_pipe(ctx->consumer_data_pipe[1]);
310 notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
311 }
312}
313
a6ba4fe1
DG
314/*
315 * Flag a relayd socket pair for destruction. Destroy it if the refcount
316 * reaches zero.
317 *
318 * RCU read side lock MUST be aquired before calling this function.
319 */
320void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
321{
322 assert(relayd);
323
324 /* Set destroy flag for this object */
325 uatomic_set(&relayd->destroy_flag, 1);
326
327 /* Destroy the relayd if refcount is 0 */
328 if (uatomic_read(&relayd->refcount) == 0) {
d09e1200 329 destroy_relayd(relayd);
a6ba4fe1
DG
330 }
331}
332
3bd1e081
MD
333/*
334 * Remove a stream from the global list protected by a mutex. This
335 * function is also responsible for freeing its data structures.
336 */
e316aad5
DG
337void consumer_del_stream(struct lttng_consumer_stream *stream,
338 struct lttng_ht *ht)
3bd1e081
MD
339{
340 int ret;
e4421fec 341 struct lttng_ht_iter iter;
3bd1e081 342 struct lttng_consumer_channel *free_chan = NULL;
00e2e675
DG
343 struct consumer_relayd_sock_pair *relayd;
344
345 assert(stream);
3bd1e081 346
8994307f
DG
347 DBG("Consumer del stream %d", stream->wait_fd);
348
e316aad5
DG
349 if (ht == NULL) {
350 /* Means the stream was allocated but not successfully added */
351 goto free_stream;
352 }
353
8994307f 354 pthread_mutex_lock(&stream->lock);
3bd1e081
MD
355 pthread_mutex_lock(&consumer_data.lock);
356
357 switch (consumer_data.type) {
358 case LTTNG_CONSUMER_KERNEL:
359 if (stream->mmap_base != NULL) {
360 ret = munmap(stream->mmap_base, stream->mmap_len);
361 if (ret != 0) {
7a57cf92 362 PERROR("munmap");
3bd1e081
MD
363 }
364 }
365 break;
7753dea8
MD
366 case LTTNG_CONSUMER32_UST:
367 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
368 lttng_ustconsumer_del_stream(stream);
369 break;
370 default:
371 ERR("Unknown consumer_data type");
372 assert(0);
373 goto end;
374 }
375
6065ceec 376 rcu_read_lock();
04253271 377 iter.iter.node = &stream->node.node;
e316aad5 378 ret = lttng_ht_del(ht, &iter);
04253271 379 assert(!ret);
ca22feea
DG
380
381 /* Remove node session id from the consumer_data stream ht */
382 iter.iter.node = &stream->node_session_id.node;
383 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
384 assert(!ret);
6065ceec
DG
385 rcu_read_unlock();
386
50f8ae69 387 assert(consumer_data.stream_count > 0);
3bd1e081 388 consumer_data.stream_count--;
50f8ae69 389
3bd1e081 390 if (stream->out_fd >= 0) {
4c462e79
MD
391 ret = close(stream->out_fd);
392 if (ret) {
393 PERROR("close");
394 }
3bd1e081 395 }
b5c5fc29 396 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
397 ret = close(stream->wait_fd);
398 if (ret) {
399 PERROR("close");
400 }
3bd1e081 401 }
2c1dd183 402 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
403 ret = close(stream->shm_fd);
404 if (ret) {
405 PERROR("close");
406 }
3bd1e081 407 }
00e2e675
DG
408
409 /* Check and cleanup relayd */
b0b335c8 410 rcu_read_lock();
00e2e675
DG
411 relayd = consumer_find_relayd(stream->net_seq_idx);
412 if (relayd != NULL) {
b0b335c8
MD
413 uatomic_dec(&relayd->refcount);
414 assert(uatomic_read(&relayd->refcount) >= 0);
173af62f 415
3f8e211f
DG
416 /* Closing streams requires to lock the control socket. */
417 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
173af62f
DG
418 ret = relayd_send_close_stream(&relayd->control_sock,
419 stream->relayd_stream_id,
420 stream->next_net_seq_num - 1);
3f8e211f 421 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
173af62f 422 if (ret < 0) {
a4b92340
DG
423 DBG("Unable to close stream on the relayd. Continuing");
424 /*
425 * Continue here. There is nothing we can do for the relayd.
426 * Chances are that the relayd has closed the socket so we just
427 * continue cleaning up.
428 */
173af62f
DG
429 }
430
431 /* Both conditions are met, we destroy the relayd. */
432 if (uatomic_read(&relayd->refcount) == 0 &&
433 uatomic_read(&relayd->destroy_flag)) {
d09e1200 434 destroy_relayd(relayd);
00e2e675 435 }
00e2e675 436 }
b0b335c8 437 rcu_read_unlock();
00e2e675 438
c30aaa51
MD
439 uatomic_dec(&stream->chan->refcount);
440 if (!uatomic_read(&stream->chan->refcount)
441 && !uatomic_read(&stream->chan->nb_init_streams)) {
3bd1e081 442 free_chan = stream->chan;
00e2e675
DG
443 }
444
3bd1e081
MD
445end:
446 consumer_data.need_update = 1;
447 pthread_mutex_unlock(&consumer_data.lock);
8994307f 448 pthread_mutex_unlock(&stream->lock);
3bd1e081 449
c30aaa51 450 if (free_chan) {
3bd1e081 451 consumer_del_channel(free_chan);
c30aaa51 452 }
e316aad5
DG
453
454free_stream:
455 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
456}
457
458struct lttng_consumer_stream *consumer_allocate_stream(
459 int channel_key, int stream_key,
460 int shm_fd, int wait_fd,
461 enum lttng_consumer_stream_state state,
462 uint64_t mmap_len,
463 enum lttng_event_output output,
6df2e2c9
MD
464 const char *path_name,
465 uid_t uid,
00e2e675
DG
466 gid_t gid,
467 int net_index,
c80048c6 468 int metadata_flag,
53632229 469 uint64_t session_id,
c80048c6 470 int *alloc_ret)
3bd1e081
MD
471{
472 struct lttng_consumer_stream *stream;
3bd1e081 473
effcf122 474 stream = zmalloc(sizeof(*stream));
3bd1e081 475 if (stream == NULL) {
7a57cf92 476 PERROR("malloc struct lttng_consumer_stream");
c80048c6 477 *alloc_ret = -ENOMEM;
7a57cf92 478 goto end;
3bd1e081 479 }
7a57cf92
DG
480
481 /*
482 * Get stream's channel reference. Needed when adding the stream to the
483 * global hash table.
484 */
3bd1e081
MD
485 stream->chan = consumer_find_channel(channel_key);
486 if (!stream->chan) {
c80048c6 487 *alloc_ret = -ENOENT;
7a57cf92 488 ERR("Unable to find channel for stream %d", stream_key);
c80048c6 489 goto error;
3bd1e081 490 }
e316aad5 491
3bd1e081
MD
492 stream->key = stream_key;
493 stream->shm_fd = shm_fd;
494 stream->wait_fd = wait_fd;
495 stream->out_fd = -1;
496 stream->out_fd_offset = 0;
497 stream->state = state;
498 stream->mmap_len = mmap_len;
499 stream->mmap_base = NULL;
500 stream->output = output;
6df2e2c9
MD
501 stream->uid = uid;
502 stream->gid = gid;
00e2e675
DG
503 stream->net_seq_idx = net_index;
504 stream->metadata_flag = metadata_flag;
53632229 505 stream->session_id = session_id;
00e2e675
DG
506 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
507 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
53632229 508 pthread_mutex_init(&stream->lock, NULL);
58b1f425
DG
509
510 /*
511 * Index differently the metadata node because the thread is using an
512 * internal hash table to match streams in the metadata_ht to the epoll set
513 * file descriptor.
514 */
515 if (metadata_flag) {
516 lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
517 } else {
518 lttng_ht_node_init_ulong(&stream->node, stream->key);
519 }
c30aaa51 520
53632229
DG
521 /* Init session id node with the stream session id */
522 lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
523
c869f647
DG
524 /*
525 * The cpu number is needed before using any ustctl_* actions. Ignored for
526 * the kernel so the value does not matter.
527 */
528 pthread_mutex_lock(&consumer_data.lock);
529 stream->cpu = stream->chan->cpucount++;
530 pthread_mutex_unlock(&consumer_data.lock);
531
c30aaa51 532 DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
53632229
DG
533 " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
534 stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
c30aaa51 535 (unsigned long long) stream->mmap_len, stream->out_fd,
53632229 536 stream->net_seq_idx, stream->session_id);
3bd1e081 537 return stream;
c80048c6
MD
538
539error:
540 free(stream);
7a57cf92 541end:
c80048c6 542 return NULL;
3bd1e081
MD
543}
544
545/*
546 * Add a stream to the global list protected by a mutex.
547 */
43c34bc3
DG
548static int consumer_add_stream(struct lttng_consumer_stream *stream,
549 struct lttng_ht *ht)
3bd1e081
MD
550{
551 int ret = 0;
00e2e675 552 struct consumer_relayd_sock_pair *relayd;
3bd1e081 553
e316aad5 554 assert(stream);
43c34bc3 555 assert(ht);
c77fc10a 556
e316aad5
DG
557 DBG3("Adding consumer stream %d", stream->key);
558
559 pthread_mutex_lock(&consumer_data.lock);
b0b335c8 560 rcu_read_lock();
e316aad5 561
43c34bc3
DG
562 /* Steal stream identifier to avoid having streams with the same key */
563 consumer_steal_stream_key(stream->key, ht);
564
565 lttng_ht_add_unique_ulong(ht, &stream->node);
00e2e675 566
ca22feea
DG
567 /*
568 * Add stream to the stream_list_ht of the consumer data. No need to steal
569 * the key since the HT does not use it and we allow to add redundant keys
570 * into this table.
571 */
572 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
573
00e2e675
DG
574 /* Check and cleanup relayd */
575 relayd = consumer_find_relayd(stream->net_seq_idx);
576 if (relayd != NULL) {
b0b335c8 577 uatomic_inc(&relayd->refcount);
00e2e675
DG
578 }
579
e316aad5
DG
580 /* Update channel refcount once added without error(s). */
581 uatomic_inc(&stream->chan->refcount);
582
583 /*
584 * When nb_init_streams reaches 0, we don't need to trigger any action in
585 * terms of destroying the associated channel, because the action that
586 * causes the count to become 0 also causes a stream to be added. The
587 * channel deletion will thus be triggered by the following removal of this
588 * stream.
589 */
590 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
591 uatomic_dec(&stream->chan->nb_init_streams);
592 }
593
594 /* Update consumer data once the node is inserted. */
3bd1e081
MD
595 consumer_data.stream_count++;
596 consumer_data.need_update = 1;
597
e316aad5 598 rcu_read_unlock();
3bd1e081 599 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 600
3bd1e081
MD
601 return ret;
602}
603
00e2e675 604/*
3f8e211f
DG
605 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
606 * be acquired before calling this.
00e2e675 607 */
d09e1200 608static int add_relayd(struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
609{
610 int ret = 0;
611 struct lttng_ht_node_ulong *node;
612 struct lttng_ht_iter iter;
613
614 if (relayd == NULL) {
615 ret = -1;
616 goto end;
617 }
618
00e2e675
DG
619 lttng_ht_lookup(consumer_data.relayd_ht,
620 (void *)((unsigned long) relayd->net_seq_idx), &iter);
621 node = lttng_ht_iter_get_node_ulong(&iter);
622 if (node != NULL) {
00e2e675
DG
623 /* Relayd already exist. Ignore the insertion */
624 goto end;
625 }
626 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
627
00e2e675
DG
628end:
629 return ret;
630}
631
632/*
633 * Allocate and return a consumer relayd socket.
634 */
635struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
636 int net_seq_idx)
637{
638 struct consumer_relayd_sock_pair *obj = NULL;
639
640 /* Negative net sequence index is a failure */
641 if (net_seq_idx < 0) {
642 goto error;
643 }
644
645 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
646 if (obj == NULL) {
647 PERROR("zmalloc relayd sock");
648 goto error;
649 }
650
651 obj->net_seq_idx = net_seq_idx;
652 obj->refcount = 0;
173af62f 653 obj->destroy_flag = 0;
00e2e675
DG
654 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
655 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
656
657error:
658 return obj;
659}
660
661/*
662 * Find a relayd socket pair in the global consumer data.
663 *
664 * Return the object if found else NULL.
b0b335c8
MD
665 * RCU read-side lock must be held across this call and while using the
666 * returned object.
00e2e675
DG
667 */
668struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
669{
670 struct lttng_ht_iter iter;
671 struct lttng_ht_node_ulong *node;
672 struct consumer_relayd_sock_pair *relayd = NULL;
673
674 /* Negative keys are lookup failures */
675 if (key < 0) {
676 goto error;
677 }
678
00e2e675
DG
679 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
680 &iter);
681 node = lttng_ht_iter_get_node_ulong(&iter);
682 if (node != NULL) {
683 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
684 }
685
00e2e675
DG
686error:
687 return relayd;
688}
689
690/*
691 * Handle stream for relayd transmission if the stream applies for network
692 * streaming where the net sequence index is set.
693 *
694 * Return destination file descriptor or negative value on error.
695 */
6197aea7 696static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
1d4dfdef
DG
697 size_t data_size, unsigned long padding,
698 struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
699{
700 int outfd = -1, ret;
00e2e675
DG
701 struct lttcomm_relayd_data_hdr data_hdr;
702
703 /* Safety net */
704 assert(stream);
6197aea7 705 assert(relayd);
00e2e675
DG
706
707 /* Reset data header */
708 memset(&data_hdr, 0, sizeof(data_hdr));
709
00e2e675
DG
710 if (stream->metadata_flag) {
711 /* Caller MUST acquire the relayd control socket lock */
712 ret = relayd_send_metadata(&relayd->control_sock, data_size);
713 if (ret < 0) {
714 goto error;
715 }
716
717 /* Metadata are always sent on the control socket. */
718 outfd = relayd->control_sock.fd;
719 } else {
720 /* Set header with stream information */
721 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
722 data_hdr.data_size = htobe32(data_size);
1d4dfdef 723 data_hdr.padding_size = htobe32(padding);
173af62f 724 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
00e2e675
DG
725 /* Other fields are zeroed previously */
726
727 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
728 sizeof(data_hdr));
729 if (ret < 0) {
730 goto error;
731 }
732
733 /* Set to go on data socket */
734 outfd = relayd->data_sock.fd;
735 }
736
737error:
738 return outfd;
739}
740
702b1ea4
MD
741static
742void consumer_free_channel(struct rcu_head *head)
743{
744 struct lttng_ht_node_ulong *node =
745 caa_container_of(head, struct lttng_ht_node_ulong, head);
746 struct lttng_consumer_channel *channel =
747 caa_container_of(node, struct lttng_consumer_channel, node);
748
749 free(channel);
750}
751
3bd1e081
MD
752/*
753 * Remove a channel from the global list protected by a mutex. This
754 * function is also responsible for freeing its data structures.
755 */
756void consumer_del_channel(struct lttng_consumer_channel *channel)
757{
758 int ret;
e4421fec 759 struct lttng_ht_iter iter;
3bd1e081
MD
760
761 pthread_mutex_lock(&consumer_data.lock);
762
763 switch (consumer_data.type) {
764 case LTTNG_CONSUMER_KERNEL:
765 break;
7753dea8
MD
766 case LTTNG_CONSUMER32_UST:
767 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
768 lttng_ustconsumer_del_channel(channel);
769 break;
770 default:
771 ERR("Unknown consumer_data type");
772 assert(0);
773 goto end;
774 }
775
6065ceec 776 rcu_read_lock();
04253271
MD
777 iter.iter.node = &channel->node.node;
778 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
779 assert(!ret);
6065ceec
DG
780 rcu_read_unlock();
781
3bd1e081
MD
782 if (channel->mmap_base != NULL) {
783 ret = munmap(channel->mmap_base, channel->mmap_len);
784 if (ret != 0) {
7a57cf92 785 PERROR("munmap");
3bd1e081
MD
786 }
787 }
b5c5fc29 788 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
789 ret = close(channel->wait_fd);
790 if (ret) {
791 PERROR("close");
792 }
3bd1e081 793 }
2c1dd183 794 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
795 ret = close(channel->shm_fd);
796 if (ret) {
797 PERROR("close");
798 }
3bd1e081 799 }
702b1ea4
MD
800
801 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
802end:
803 pthread_mutex_unlock(&consumer_data.lock);
804}
805
806struct lttng_consumer_channel *consumer_allocate_channel(
807 int channel_key,
808 int shm_fd, int wait_fd,
809 uint64_t mmap_len,
c30aaa51
MD
810 uint64_t max_sb_size,
811 unsigned int nb_init_streams)
3bd1e081
MD
812{
813 struct lttng_consumer_channel *channel;
814 int ret;
815
276b26d1 816 channel = zmalloc(sizeof(*channel));
3bd1e081 817 if (channel == NULL) {
7a57cf92 818 PERROR("malloc struct lttng_consumer_channel");
3bd1e081
MD
819 goto end;
820 }
821 channel->key = channel_key;
822 channel->shm_fd = shm_fd;
823 channel->wait_fd = wait_fd;
824 channel->mmap_len = mmap_len;
825 channel->max_sb_size = max_sb_size;
826 channel->refcount = 0;
c30aaa51 827 channel->nb_init_streams = nb_init_streams;
e4421fec 828 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
829
830 switch (consumer_data.type) {
831 case LTTNG_CONSUMER_KERNEL:
832 channel->mmap_base = NULL;
833 channel->mmap_len = 0;
834 break;
7753dea8
MD
835 case LTTNG_CONSUMER32_UST:
836 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
837 ret = lttng_ustconsumer_allocate_channel(channel);
838 if (ret) {
839 free(channel);
840 return NULL;
841 }
842 break;
843 default:
844 ERR("Unknown consumer_data type");
845 assert(0);
846 goto end;
847 }
848 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
00e2e675 849 channel->key, channel->shm_fd, channel->wait_fd,
3bd1e081
MD
850 (unsigned long long) channel->mmap_len,
851 (unsigned long long) channel->max_sb_size);
852end:
853 return channel;
854}
855
856/*
857 * Add a channel to the global list protected by a mutex.
858 */
859int consumer_add_channel(struct lttng_consumer_channel *channel)
860{
c77fc10a
DG
861 struct lttng_ht_node_ulong *node;
862 struct lttng_ht_iter iter;
863
3bd1e081 864 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
865 /* Steal channel identifier, for UST */
866 consumer_steal_channel_key(channel->key);
6065ceec 867 rcu_read_lock();
c77fc10a
DG
868
869 lttng_ht_lookup(consumer_data.channel_ht,
870 (void *)((unsigned long) channel->key), &iter);
871 node = lttng_ht_iter_get_node_ulong(&iter);
872 if (node != NULL) {
873 /* Channel already exist. Ignore the insertion */
874 goto end;
875 }
876
04253271 877 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
878
879end:
6065ceec 880 rcu_read_unlock();
3bd1e081 881 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 882
7ad0a0cb 883 return 0;
3bd1e081
MD
884}
885
886/*
887 * Allocate the pollfd structure and the local view of the out fds to avoid
888 * doing a lookup in the linked list and concurrency issues when writing is
889 * needed. Called with consumer_data.lock held.
890 *
891 * Returns the number of fds in the structures.
892 */
43c34bc3 893static int consumer_update_poll_array(
3bd1e081 894 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
43c34bc3 895 struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
3bd1e081 896{
3bd1e081 897 int i = 0;
e4421fec
DG
898 struct lttng_ht_iter iter;
899 struct lttng_consumer_stream *stream;
3bd1e081
MD
900
901 DBG("Updating poll fd array");
481d6c57 902 rcu_read_lock();
43c34bc3 903 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
8994307f
DG
904 /*
905 * Only active streams with an active end point can be added to the
906 * poll set and local stream storage of the thread.
907 *
908 * There is a potential race here for endpoint_status to be updated
909 * just after the check. However, this is OK since the stream(s) will
910 * be deleted once the thread is notified that the end point state has
911 * changed where this function will be called back again.
912 */
913 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
914 stream->endpoint_status) {
3bd1e081
MD
915 continue;
916 }
e4421fec
DG
917 DBG("Active FD %d", stream->wait_fd);
918 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 919 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 920 local_stream[i] = stream;
3bd1e081
MD
921 i++;
922 }
481d6c57 923 rcu_read_unlock();
3bd1e081
MD
924
925 /*
50f8ae69 926 * Insert the consumer_data_pipe at the end of the array and don't
3bd1e081
MD
927 * increment i so nb_fd is the number of real FD.
928 */
50f8ae69 929 (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
509bb1cf 930 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
931 return i;
932}
933
934/*
935 * Poll on the should_quit pipe and the command socket return -1 on error and
936 * should exit, 0 if data is available on the command socket
937 */
938int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
939{
940 int num_rdy;
941
88f2b785 942restart:
3bd1e081
MD
943 num_rdy = poll(consumer_sockpoll, 2, -1);
944 if (num_rdy == -1) {
88f2b785
MD
945 /*
946 * Restart interrupted system call.
947 */
948 if (errno == EINTR) {
949 goto restart;
950 }
7a57cf92 951 PERROR("Poll error");
3bd1e081
MD
952 goto exit;
953 }
509bb1cf 954 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
955 DBG("consumer_should_quit wake up");
956 goto exit;
957 }
958 return 0;
959
960exit:
961 return -1;
962}
963
964/*
965 * Set the error socket.
966 */
967void lttng_consumer_set_error_sock(
968 struct lttng_consumer_local_data *ctx, int sock)
969{
970 ctx->consumer_error_socket = sock;
971}
972
973/*
974 * Set the command socket path.
975 */
3bd1e081
MD
976void lttng_consumer_set_command_sock_path(
977 struct lttng_consumer_local_data *ctx, char *sock)
978{
979 ctx->consumer_command_sock_path = sock;
980}
981
982/*
983 * Send return code to the session daemon.
984 * If the socket is not defined, we return 0, it is not a fatal error
985 */
986int lttng_consumer_send_error(
987 struct lttng_consumer_local_data *ctx, int cmd)
988{
989 if (ctx->consumer_error_socket > 0) {
990 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
991 sizeof(enum lttcomm_sessiond_command));
992 }
993
994 return 0;
995}
996
997/*
998 * Close all the tracefiles and stream fds, should be called when all instances
999 * are destroyed.
1000 */
1001void lttng_consumer_cleanup(void)
1002{
e4421fec 1003 struct lttng_ht_iter iter;
6065ceec
DG
1004 struct lttng_ht_node_ulong *node;
1005
1006 rcu_read_lock();
3bd1e081 1007
6065ceec
DG
1008 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
1009 node) {
702b1ea4
MD
1010 struct lttng_consumer_channel *channel =
1011 caa_container_of(node, struct lttng_consumer_channel, node);
1012 consumer_del_channel(channel);
3bd1e081 1013 }
6065ceec
DG
1014
1015 rcu_read_unlock();
d6ce1df2 1016
d6ce1df2 1017 lttng_ht_destroy(consumer_data.channel_ht);
3bd1e081
MD
1018}
1019
1020/*
1021 * Called from signal handler.
1022 */
1023void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1024{
1025 int ret;
1026 consumer_quit = 1;
6f94560a
MD
1027 do {
1028 ret = write(ctx->consumer_should_quit[1], "4", 1);
1029 } while (ret < 0 && errno == EINTR);
3bd1e081 1030 if (ret < 0) {
7a57cf92 1031 PERROR("write consumer quit");
3bd1e081 1032 }
ab1027f4
DG
1033
1034 DBG("Consumer flag that it should quit");
3bd1e081
MD
1035}
1036
00e2e675
DG
1037void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1038 off_t orig_offset)
3bd1e081
MD
1039{
1040 int outfd = stream->out_fd;
1041
1042 /*
1043 * This does a blocking write-and-wait on any page that belongs to the
1044 * subbuffer prior to the one we just wrote.
1045 * Don't care about error values, as these are just hints and ways to
1046 * limit the amount of page cache used.
1047 */
1048 if (orig_offset < stream->chan->max_sb_size) {
1049 return;
1050 }
b9182dd9 1051 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
1052 stream->chan->max_sb_size,
1053 SYNC_FILE_RANGE_WAIT_BEFORE
1054 | SYNC_FILE_RANGE_WRITE
1055 | SYNC_FILE_RANGE_WAIT_AFTER);
1056 /*
1057 * Give hints to the kernel about how we access the file:
1058 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1059 * we write it.
1060 *
1061 * We need to call fadvise again after the file grows because the
1062 * kernel does not seem to apply fadvise to non-existing parts of the
1063 * file.
1064 *
1065 * Call fadvise _after_ having waited for the page writeback to
1066 * complete because the dirty page writeback semantic is not well
1067 * defined. So it can be expected to lead to lower throughput in
1068 * streaming.
1069 */
1070 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
1071 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
1072}
1073
1074/*
1075 * Initialise the necessary environnement :
1076 * - create a new context
1077 * - create the poll_pipe
1078 * - create the should_quit pipe (for signal handler)
1079 * - create the thread pipe (for splice)
1080 *
1081 * Takes a function pointer as argument, this function is called when data is
1082 * available on a buffer. This function is responsible to do the
1083 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1084 * buffer configuration and then kernctl_put_next_subbuf at the end.
1085 *
1086 * Returns a pointer to the new context or NULL on error.
1087 */
1088struct lttng_consumer_local_data *lttng_consumer_create(
1089 enum lttng_consumer_type type,
4078b776 1090 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 1091 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
1092 int (*recv_channel)(struct lttng_consumer_channel *channel),
1093 int (*recv_stream)(struct lttng_consumer_stream *stream),
1094 int (*update_stream)(int stream_key, uint32_t state))
1095{
1096 int ret, i;
1097 struct lttng_consumer_local_data *ctx;
1098
1099 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1100 consumer_data.type == type);
1101 consumer_data.type = type;
1102
effcf122 1103 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081 1104 if (ctx == NULL) {
7a57cf92 1105 PERROR("allocating context");
3bd1e081
MD
1106 goto error;
1107 }
1108
1109 ctx->consumer_error_socket = -1;
1110 /* assign the callbacks */
1111 ctx->on_buffer_ready = buffer_ready;
1112 ctx->on_recv_channel = recv_channel;
1113 ctx->on_recv_stream = recv_stream;
1114 ctx->on_update_stream = update_stream;
1115
50f8ae69 1116 ret = pipe(ctx->consumer_data_pipe);
3bd1e081 1117 if (ret < 0) {
7a57cf92 1118 PERROR("Error creating poll pipe");
3bd1e081
MD
1119 goto error_poll_pipe;
1120 }
1121
04fdd819 1122 /* set read end of the pipe to non-blocking */
50f8ae69 1123 ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
04fdd819 1124 if (ret < 0) {
7a57cf92 1125 PERROR("fcntl O_NONBLOCK");
04fdd819
MD
1126 goto error_poll_fcntl;
1127 }
1128
1129 /* set write end of the pipe to non-blocking */
50f8ae69 1130 ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
04fdd819 1131 if (ret < 0) {
7a57cf92 1132 PERROR("fcntl O_NONBLOCK");
04fdd819
MD
1133 goto error_poll_fcntl;
1134 }
1135
3bd1e081
MD
1136 ret = pipe(ctx->consumer_should_quit);
1137 if (ret < 0) {
7a57cf92 1138 PERROR("Error creating recv pipe");
3bd1e081
MD
1139 goto error_quit_pipe;
1140 }
1141
1142 ret = pipe(ctx->consumer_thread_pipe);
1143 if (ret < 0) {
7a57cf92 1144 PERROR("Error creating thread pipe");
3bd1e081
MD
1145 goto error_thread_pipe;
1146 }
1147
fb3a43a9
DG
1148 ret = utils_create_pipe(ctx->consumer_metadata_pipe);
1149 if (ret < 0) {
1150 goto error_metadata_pipe;
1151 }
3bd1e081 1152
fb3a43a9
DG
1153 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1154 if (ret < 0) {
1155 goto error_splice_pipe;
1156 }
1157
1158 return ctx;
3bd1e081 1159
fb3a43a9
DG
1160error_splice_pipe:
1161 utils_close_pipe(ctx->consumer_metadata_pipe);
1162error_metadata_pipe:
1163 utils_close_pipe(ctx->consumer_thread_pipe);
3bd1e081
MD
1164error_thread_pipe:
1165 for (i = 0; i < 2; i++) {
1166 int err;
1167
1168 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
1169 if (err) {
1170 PERROR("close");
1171 }
3bd1e081 1172 }
04fdd819 1173error_poll_fcntl:
3bd1e081
MD
1174error_quit_pipe:
1175 for (i = 0; i < 2; i++) {
1176 int err;
1177
50f8ae69 1178 err = close(ctx->consumer_data_pipe[i]);
4c462e79
MD
1179 if (err) {
1180 PERROR("close");
1181 }
3bd1e081
MD
1182 }
1183error_poll_pipe:
1184 free(ctx);
1185error:
1186 return NULL;
1187}
1188
1189/*
1190 * Close all fds associated with the instance and free the context.
1191 */
1192void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1193{
4c462e79
MD
1194 int ret;
1195
ab1027f4
DG
1196 DBG("Consumer destroying it. Closing everything.");
1197
4c462e79
MD
1198 ret = close(ctx->consumer_error_socket);
1199 if (ret) {
1200 PERROR("close");
1201 }
1202 ret = close(ctx->consumer_thread_pipe[0]);
1203 if (ret) {
1204 PERROR("close");
1205 }
1206 ret = close(ctx->consumer_thread_pipe[1]);
1207 if (ret) {
1208 PERROR("close");
1209 }
50f8ae69 1210 ret = close(ctx->consumer_data_pipe[0]);
4c462e79
MD
1211 if (ret) {
1212 PERROR("close");
1213 }
50f8ae69 1214 ret = close(ctx->consumer_data_pipe[1]);
4c462e79
MD
1215 if (ret) {
1216 PERROR("close");
1217 }
1218 ret = close(ctx->consumer_should_quit[0]);
1219 if (ret) {
1220 PERROR("close");
1221 }
1222 ret = close(ctx->consumer_should_quit[1]);
1223 if (ret) {
1224 PERROR("close");
1225 }
fb3a43a9
DG
1226 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1227
3bd1e081
MD
1228 unlink(ctx->consumer_command_sock_path);
1229 free(ctx);
1230}
1231
6197aea7
DG
1232/*
1233 * Write the metadata stream id on the specified file descriptor.
1234 */
1235static int write_relayd_metadata_id(int fd,
1236 struct lttng_consumer_stream *stream,
1d4dfdef
DG
1237 struct consumer_relayd_sock_pair *relayd,
1238 unsigned long padding)
6197aea7
DG
1239{
1240 int ret;
1d4dfdef 1241 struct lttcomm_relayd_metadata_payload hdr;
6197aea7 1242
1d4dfdef
DG
1243 hdr.stream_id = htobe64(stream->relayd_stream_id);
1244 hdr.padding_size = htobe32(padding);
6197aea7 1245 do {
1d4dfdef 1246 ret = write(fd, (void *) &hdr, sizeof(hdr));
6197aea7
DG
1247 } while (ret < 0 && errno == EINTR);
1248 if (ret < 0) {
1249 PERROR("write metadata stream id");
1250 goto end;
1251 }
1d4dfdef
DG
1252 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1253 stream->relayd_stream_id, padding);
6197aea7
DG
1254
1255end:
1256 return ret;
1257}
1258
3bd1e081 1259/*
09e26845
DG
1260 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1261 * core function for writing trace buffers to either the local filesystem or
1262 * the network.
1263 *
1264 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1265 *
1266 * Returns the number of bytes written
1267 */
4078b776 1268ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081 1269 struct lttng_consumer_local_data *ctx,
1d4dfdef
DG
1270 struct lttng_consumer_stream *stream, unsigned long len,
1271 unsigned long padding)
3bd1e081 1272{
f02e1e8a
DG
1273 unsigned long mmap_offset;
1274 ssize_t ret = 0, written = 0;
1275 off_t orig_offset = stream->out_fd_offset;
1276 /* Default is on the disk */
1277 int outfd = stream->out_fd;
f02e1e8a 1278 struct consumer_relayd_sock_pair *relayd = NULL;
8994307f 1279 unsigned int relayd_hang_up = 0;
f02e1e8a
DG
1280
1281 /* RCU lock for the relayd pointer */
1282 rcu_read_lock();
1283
c8f59ee5
DG
1284 pthread_mutex_lock(&stream->lock);
1285
f02e1e8a
DG
1286 /* Flag that the current stream if set for network streaming. */
1287 if (stream->net_seq_idx != -1) {
1288 relayd = consumer_find_relayd(stream->net_seq_idx);
1289 if (relayd == NULL) {
1290 goto end;
1291 }
1292 }
1293
1294 /* get the offset inside the fd to mmap */
3bd1e081
MD
1295 switch (consumer_data.type) {
1296 case LTTNG_CONSUMER_KERNEL:
f02e1e8a
DG
1297 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1298 break;
7753dea8
MD
1299 case LTTNG_CONSUMER32_UST:
1300 case LTTNG_CONSUMER64_UST:
f02e1e8a
DG
1301 ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
1302 stream->buf, &mmap_offset);
1303 break;
3bd1e081
MD
1304 default:
1305 ERR("Unknown consumer_data type");
1306 assert(0);
1307 }
f02e1e8a
DG
1308 if (ret != 0) {
1309 errno = -ret;
1310 PERROR("tracer ctl get_mmap_read_offset");
1311 written = ret;
1312 goto end;
1313 }
b9182dd9 1314
f02e1e8a
DG
1315 /* Handle stream on the relayd if the output is on the network */
1316 if (relayd) {
1317 unsigned long netlen = len;
1318
1319 /*
1320 * Lock the control socket for the complete duration of the function
1321 * since from this point on we will use the socket.
1322 */
1323 if (stream->metadata_flag) {
1324 /* Metadata requires the control socket. */
1325 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1d4dfdef 1326 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
f02e1e8a
DG
1327 }
1328
1d4dfdef 1329 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
f02e1e8a
DG
1330 if (ret >= 0) {
1331 /* Use the returned socket. */
1332 outfd = ret;
1333
1334 /* Write metadata stream id before payload */
1335 if (stream->metadata_flag) {
1d4dfdef 1336 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
f02e1e8a 1337 if (ret < 0) {
f02e1e8a 1338 written = ret;
8994307f
DG
1339 /* Socket operation failed. We consider the relayd dead */
1340 if (ret == -EPIPE || ret == -EINVAL) {
1341 relayd_hang_up = 1;
1342 goto write_error;
1343 }
f02e1e8a
DG
1344 goto end;
1345 }
f02e1e8a 1346 }
8994307f
DG
1347 } else {
1348 /* Socket operation failed. We consider the relayd dead */
1349 if (ret == -EPIPE || ret == -EINVAL) {
1350 relayd_hang_up = 1;
1351 goto write_error;
1352 }
1353 /* Else, use the default set before which is the filesystem. */
f02e1e8a 1354 }
1d4dfdef
DG
1355 } else {
1356 /* No streaming, we have to set the len with the full padding */
1357 len += padding;
f02e1e8a
DG
1358 }
1359
1360 while (len > 0) {
1361 do {
1362 ret = write(outfd, stream->mmap_base + mmap_offset, len);
1363 } while (ret < 0 && errno == EINTR);
1d4dfdef 1364 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
f02e1e8a
DG
1365 if (ret < 0) {
1366 PERROR("Error in file write");
1367 if (written == 0) {
1368 written = ret;
1369 }
8994307f
DG
1370 /* Socket operation failed. We consider the relayd dead */
1371 if (errno == EPIPE || errno == EINVAL) {
1372 relayd_hang_up = 1;
1373 goto write_error;
1374 }
f02e1e8a
DG
1375 goto end;
1376 } else if (ret > len) {
77c7c900 1377 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
f02e1e8a
DG
1378 written += ret;
1379 goto end;
1380 } else {
1381 len -= ret;
1382 mmap_offset += ret;
1383 }
f02e1e8a
DG
1384
1385 /* This call is useless on a socket so better save a syscall. */
1386 if (!relayd) {
1387 /* This won't block, but will start writeout asynchronously */
1388 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1389 SYNC_FILE_RANGE_WRITE);
1390 stream->out_fd_offset += ret;
1391 }
1392 written += ret;
1393 }
1394 lttng_consumer_sync_trace_file(stream, orig_offset);
1395
8994307f
DG
1396write_error:
1397 /*
1398 * This is a special case that the relayd has closed its socket. Let's
1399 * cleanup the relayd object and all associated streams.
1400 */
1401 if (relayd && relayd_hang_up) {
1402 cleanup_relayd(relayd, ctx);
1403 }
1404
f02e1e8a
DG
1405end:
1406 /* Unlock only if ctrl socket used */
1407 if (relayd && stream->metadata_flag) {
1408 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1409 }
6f6eda74 1410 pthread_mutex_unlock(&stream->lock);
f02e1e8a
DG
1411
1412 rcu_read_unlock();
1413 return written;
3bd1e081
MD
1414}
1415
1416/*
1417 * Splice the data from the ring buffer to the tracefile.
1418 *
1419 * Returns the number of bytes spliced.
1420 */
4078b776 1421ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081 1422 struct lttng_consumer_local_data *ctx,
1d4dfdef
DG
1423 struct lttng_consumer_stream *stream, unsigned long len,
1424 unsigned long padding)
3bd1e081 1425{
f02e1e8a
DG
1426 ssize_t ret = 0, written = 0, ret_splice = 0;
1427 loff_t offset = 0;
1428 off_t orig_offset = stream->out_fd_offset;
1429 int fd = stream->wait_fd;
1430 /* Default is on the disk */
1431 int outfd = stream->out_fd;
f02e1e8a 1432 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1433 int *splice_pipe;
8994307f 1434 unsigned int relayd_hang_up = 0;
f02e1e8a 1435
3bd1e081
MD
1436 switch (consumer_data.type) {
1437 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1438 break;
7753dea8
MD
1439 case LTTNG_CONSUMER32_UST:
1440 case LTTNG_CONSUMER64_UST:
f02e1e8a 1441 /* Not supported for user space tracing */
3bd1e081
MD
1442 return -ENOSYS;
1443 default:
1444 ERR("Unknown consumer_data type");
1445 assert(0);
3bd1e081
MD
1446 }
1447
f02e1e8a
DG
1448 /* RCU lock for the relayd pointer */
1449 rcu_read_lock();
1450
c8f59ee5
DG
1451 pthread_mutex_lock(&stream->lock);
1452
f02e1e8a
DG
1453 /* Flag that the current stream if set for network streaming. */
1454 if (stream->net_seq_idx != -1) {
1455 relayd = consumer_find_relayd(stream->net_seq_idx);
1456 if (relayd == NULL) {
1457 goto end;
1458 }
1459 }
1460
fb3a43a9
DG
1461 /*
1462 * Choose right pipe for splice. Metadata and trace data are handled by
1463 * different threads hence the use of two pipes in order not to race or
1464 * corrupt the written data.
1465 */
1466 if (stream->metadata_flag) {
1467 splice_pipe = ctx->consumer_splice_metadata_pipe;
1468 } else {
1469 splice_pipe = ctx->consumer_thread_pipe;
1470 }
1471
f02e1e8a 1472 /* Write metadata stream id before payload */
1d4dfdef
DG
1473 if (relayd) {
1474 int total_len = len;
f02e1e8a 1475
1d4dfdef
DG
1476 if (stream->metadata_flag) {
1477 /*
1478 * Lock the control socket for the complete duration of the function
1479 * since from this point on we will use the socket.
1480 */
1481 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1482
1483 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1484 padding);
1485 if (ret < 0) {
1486 written = ret;
8994307f
DG
1487 /* Socket operation failed. We consider the relayd dead */
1488 if (ret == -EBADF) {
1489 WARN("Remote relayd disconnected. Stopping");
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
1d4dfdef
DG
1493 goto end;
1494 }
1495
1496 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1497 }
1498
1499 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1500 if (ret >= 0) {
1501 /* Use the returned socket. */
1502 outfd = ret;
1503 } else {
8994307f
DG
1504 /* Socket operation failed. We consider the relayd dead */
1505 if (ret == -EBADF) {
1506 WARN("Remote relayd disconnected. Stopping");
1507 relayd_hang_up = 1;
1508 goto write_error;
1509 }
f02e1e8a
DG
1510 goto end;
1511 }
1d4dfdef
DG
1512 } else {
1513 /* No streaming, we have to set the len with the full padding */
1514 len += padding;
f02e1e8a
DG
1515 }
1516
1517 while (len > 0) {
1d4dfdef
DG
1518 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1519 (unsigned long)offset, len, fd, splice_pipe[1]);
fb3a43a9 1520 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1521 SPLICE_F_MOVE | SPLICE_F_MORE);
1522 DBG("splice chan to pipe, ret %zd", ret_splice);
1523 if (ret_splice < 0) {
1524 PERROR("Error in relay splice");
1525 if (written == 0) {
1526 written = ret_splice;
1527 }
1528 ret = errno;
1529 goto splice_error;
1530 }
1531
1532 /* Handle stream on the relayd if the output is on the network */
1533 if (relayd) {
1534 if (stream->metadata_flag) {
1d4dfdef
DG
1535 size_t metadata_payload_size =
1536 sizeof(struct lttcomm_relayd_metadata_payload);
1537
f02e1e8a 1538 /* Update counter to fit the spliced data */
1d4dfdef
DG
1539 ret_splice += metadata_payload_size;
1540 len += metadata_payload_size;
f02e1e8a
DG
1541 /*
1542 * We do this so the return value can match the len passed as
1543 * argument to this function.
1544 */
1d4dfdef 1545 written -= metadata_payload_size;
f02e1e8a
DG
1546 }
1547 }
1548
1549 /* Splice data out */
fb3a43a9 1550 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a 1551 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1d4dfdef 1552 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
f02e1e8a
DG
1553 if (ret_splice < 0) {
1554 PERROR("Error in file splice");
1555 if (written == 0) {
1556 written = ret_splice;
1557 }
8994307f 1558 /* Socket operation failed. We consider the relayd dead */
00c8752b 1559 if (errno == EBADF || errno == EPIPE) {
8994307f
DG
1560 WARN("Remote relayd disconnected. Stopping");
1561 relayd_hang_up = 1;
1562 goto write_error;
1563 }
f02e1e8a
DG
1564 ret = errno;
1565 goto splice_error;
1566 } else if (ret_splice > len) {
1567 errno = EINVAL;
1568 PERROR("Wrote more data than requested %zd (len: %lu)",
1569 ret_splice, len);
1570 written += ret_splice;
1571 ret = errno;
1572 goto splice_error;
1573 }
1574 len -= ret_splice;
1575
1576 /* This call is useless on a socket so better save a syscall. */
1577 if (!relayd) {
1578 /* This won't block, but will start writeout asynchronously */
1579 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1580 SYNC_FILE_RANGE_WRITE);
1581 stream->out_fd_offset += ret_splice;
1582 }
1583 written += ret_splice;
1584 }
1585 lttng_consumer_sync_trace_file(stream, orig_offset);
1586
1587 ret = ret_splice;
1588
1589 goto end;
1590
8994307f
DG
1591write_error:
1592 /*
1593 * This is a special case that the relayd has closed its socket. Let's
1594 * cleanup the relayd object and all associated streams.
1595 */
1596 if (relayd && relayd_hang_up) {
1597 cleanup_relayd(relayd, ctx);
1598 /* Skip splice error so the consumer does not fail */
1599 goto end;
1600 }
1601
f02e1e8a
DG
1602splice_error:
1603 /* send the appropriate error description to sessiond */
1604 switch (ret) {
f02e1e8a 1605 case EINVAL:
f73fabfd 1606 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1607 break;
1608 case ENOMEM:
f73fabfd 1609 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1610 break;
1611 case ESPIPE:
f73fabfd 1612 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1613 break;
1614 }
1615
1616end:
1617 if (relayd && stream->metadata_flag) {
1618 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1619 }
6f6eda74 1620 pthread_mutex_unlock(&stream->lock);
f02e1e8a
DG
1621
1622 rcu_read_unlock();
1623 return written;
3bd1e081
MD
1624}
1625
1626/*
1627 * Take a snapshot for a specific fd
1628 *
1629 * Returns 0 on success, < 0 on error
1630 */
1631int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1632 struct lttng_consumer_stream *stream)
1633{
1634 switch (consumer_data.type) {
1635 case LTTNG_CONSUMER_KERNEL:
1636 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
1637 case LTTNG_CONSUMER32_UST:
1638 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1639 return lttng_ustconsumer_take_snapshot(ctx, stream);
1640 default:
1641 ERR("Unknown consumer_data type");
1642 assert(0);
1643 return -ENOSYS;
1644 }
1645
1646}
1647
1648/*
1649 * Get the produced position
1650 *
1651 * Returns 0 on success, < 0 on error
1652 */
1653int lttng_consumer_get_produced_snapshot(
1654 struct lttng_consumer_local_data *ctx,
1655 struct lttng_consumer_stream *stream,
1656 unsigned long *pos)
1657{
1658 switch (consumer_data.type) {
1659 case LTTNG_CONSUMER_KERNEL:
1660 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
1661 case LTTNG_CONSUMER32_UST:
1662 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1663 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1664 default:
1665 ERR("Unknown consumer_data type");
1666 assert(0);
1667 return -ENOSYS;
1668 }
1669}
1670
1671int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1672 int sock, struct pollfd *consumer_sockpoll)
1673{
1674 switch (consumer_data.type) {
1675 case LTTNG_CONSUMER_KERNEL:
1676 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1677 case LTTNG_CONSUMER32_UST:
1678 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1679 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1680 default:
1681 ERR("Unknown consumer_data type");
1682 assert(0);
1683 return -ENOSYS;
1684 }
1685}
1686
43c34bc3
DG
1687/*
1688 * Iterate over all streams of the hashtable and free them properly.
1689 *
1690 * WARNING: *MUST* be used with data stream only.
1691 */
1692static void destroy_data_stream_ht(struct lttng_ht *ht)
1693{
1694 int ret;
1695 struct lttng_ht_iter iter;
1696 struct lttng_consumer_stream *stream;
1697
1698 if (ht == NULL) {
1699 return;
1700 }
1701
1702 rcu_read_lock();
1703 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1704 ret = lttng_ht_del(ht, &iter);
1705 assert(!ret);
1706
1707 call_rcu(&stream->node.head, consumer_free_stream);
1708 }
1709 rcu_read_unlock();
1710
1711 lttng_ht_destroy(ht);
1712}
1713
fb3a43a9 1714/*
f724d81e 1715 * Iterate over all streams of the hashtable and free them properly.
e316aad5
DG
1716 *
1717 * XXX: Should not be only for metadata stream or else use an other name.
fb3a43a9
DG
1718 */
1719static void destroy_stream_ht(struct lttng_ht *ht)
1720{
1721 int ret;
1722 struct lttng_ht_iter iter;
1723 struct lttng_consumer_stream *stream;
1724
1725 if (ht == NULL) {
1726 return;
1727 }
1728
d09e1200 1729 rcu_read_lock();
58b1f425 1730 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
fb3a43a9
DG
1731 ret = lttng_ht_del(ht, &iter);
1732 assert(!ret);
1733
58b1f425 1734 call_rcu(&stream->node.head, consumer_free_stream);
fb3a43a9 1735 }
d09e1200 1736 rcu_read_unlock();
fb3a43a9
DG
1737
1738 lttng_ht_destroy(ht);
1739}
1740
1741/*
1742 * Clean up a metadata stream and free its memory.
1743 */
e316aad5
DG
1744void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1745 struct lttng_ht *ht)
fb3a43a9
DG
1746{
1747 int ret;
e316aad5
DG
1748 struct lttng_ht_iter iter;
1749 struct lttng_consumer_channel *free_chan = NULL;
fb3a43a9
DG
1750 struct consumer_relayd_sock_pair *relayd;
1751
1752 assert(stream);
1753 /*
1754 * This call should NEVER receive regular stream. It must always be
1755 * metadata stream and this is crucial for data structure synchronization.
1756 */
1757 assert(stream->metadata_flag);
1758
e316aad5
DG
1759 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1760
1761 if (ht == NULL) {
1762 /* Means the stream was allocated but not successfully added */
1763 goto free_stream;
1764 }
1765
8994307f
DG
1766 pthread_mutex_lock(&stream->lock);
1767
fb3a43a9
DG
1768 pthread_mutex_lock(&consumer_data.lock);
1769 switch (consumer_data.type) {
1770 case LTTNG_CONSUMER_KERNEL:
1771 if (stream->mmap_base != NULL) {
1772 ret = munmap(stream->mmap_base, stream->mmap_len);
1773 if (ret != 0) {
1774 PERROR("munmap metadata stream");
1775 }
1776 }
1777 break;
1778 case LTTNG_CONSUMER32_UST:
1779 case LTTNG_CONSUMER64_UST:
1780 lttng_ustconsumer_del_stream(stream);
1781 break;
1782 default:
1783 ERR("Unknown consumer_data type");
1784 assert(0);
e316aad5 1785 goto end;
fb3a43a9 1786 }
fb3a43a9 1787
c869f647 1788 rcu_read_lock();
58b1f425 1789 iter.iter.node = &stream->node.node;
c869f647
DG
1790 ret = lttng_ht_del(ht, &iter);
1791 assert(!ret);
ca22feea
DG
1792
1793 /* Remove node session id from the consumer_data stream ht */
1794 iter.iter.node = &stream->node_session_id.node;
1795 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1796 assert(!ret);
c869f647
DG
1797 rcu_read_unlock();
1798
fb3a43a9
DG
1799 if (stream->out_fd >= 0) {
1800 ret = close(stream->out_fd);
1801 if (ret) {
1802 PERROR("close");
1803 }
1804 }
1805
1806 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
1807 ret = close(stream->wait_fd);
1808 if (ret) {
1809 PERROR("close");
1810 }
1811 }
1812
1813 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
1814 ret = close(stream->shm_fd);
1815 if (ret) {
1816 PERROR("close");
1817 }
1818 }
1819
1820 /* Check and cleanup relayd */
1821 rcu_read_lock();
1822 relayd = consumer_find_relayd(stream->net_seq_idx);
1823 if (relayd != NULL) {
1824 uatomic_dec(&relayd->refcount);
1825 assert(uatomic_read(&relayd->refcount) >= 0);
1826
1827 /* Closing streams requires to lock the control socket. */
1828 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1829 ret = relayd_send_close_stream(&relayd->control_sock,
1830 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1831 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1832 if (ret < 0) {
1833 DBG("Unable to close stream on the relayd. Continuing");
1834 /*
1835 * Continue here. There is nothing we can do for the relayd.
1836 * Chances are that the relayd has closed the socket so we just
1837 * continue cleaning up.
1838 */
1839 }
1840
1841 /* Both conditions are met, we destroy the relayd. */
1842 if (uatomic_read(&relayd->refcount) == 0 &&
1843 uatomic_read(&relayd->destroy_flag)) {
d09e1200 1844 destroy_relayd(relayd);
fb3a43a9
DG
1845 }
1846 }
1847 rcu_read_unlock();
1848
1849 /* Atomically decrement channel refcount since other threads can use it. */
1850 uatomic_dec(&stream->chan->refcount);
c30aaa51
MD
1851 if (!uatomic_read(&stream->chan->refcount)
1852 && !uatomic_read(&stream->chan->nb_init_streams)) {
1853 /* Go for channel deletion! */
e316aad5 1854 free_chan = stream->chan;
fb3a43a9
DG
1855 }
1856
e316aad5
DG
1857end:
1858 pthread_mutex_unlock(&consumer_data.lock);
8994307f 1859 pthread_mutex_unlock(&stream->lock);
e316aad5
DG
1860
1861 if (free_chan) {
1862 consumer_del_channel(free_chan);
1863 }
1864
1865free_stream:
58b1f425 1866 call_rcu(&stream->node.head, consumer_free_stream);
fb3a43a9
DG
1867}
1868
1869/*
1870 * Action done with the metadata stream when adding it to the consumer internal
1871 * data structures to handle it.
1872 */
e316aad5
DG
1873static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
1874 struct lttng_ht *ht)
fb3a43a9 1875{
e316aad5 1876 int ret = 0;
fb3a43a9
DG
1877 struct consumer_relayd_sock_pair *relayd;
1878
e316aad5
DG
1879 assert(stream);
1880 assert(ht);
1881
1882 DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
1883
1884 pthread_mutex_lock(&consumer_data.lock);
1885
e316aad5
DG
1886 /*
1887 * From here, refcounts are updated so be _careful_ when returning an error
1888 * after this point.
1889 */
1890
fb3a43a9 1891 rcu_read_lock();
e316aad5 1892 /* Find relayd and, if one is found, increment refcount. */
fb3a43a9
DG
1893 relayd = consumer_find_relayd(stream->net_seq_idx);
1894 if (relayd != NULL) {
1895 uatomic_inc(&relayd->refcount);
1896 }
e316aad5
DG
1897
1898 /* Update channel refcount once added without error(s). */
1899 uatomic_inc(&stream->chan->refcount);
1900
1901 /*
1902 * When nb_init_streams reaches 0, we don't need to trigger any action in
1903 * terms of destroying the associated channel, because the action that
1904 * causes the count to become 0 also causes a stream to be added. The
1905 * channel deletion will thus be triggered by the following removal of this
1906 * stream.
1907 */
1908 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
1909 uatomic_dec(&stream->chan->nb_init_streams);
1910 }
1911
43c34bc3
DG
1912 /* Steal stream identifier to avoid having streams with the same key */
1913 consumer_steal_stream_key(stream->key, ht);
1914
58b1f425 1915 lttng_ht_add_unique_ulong(ht, &stream->node);
ca22feea
DG
1916
1917 /*
1918 * Add stream to the stream_list_ht of the consumer data. No need to steal
1919 * the key since the HT does not use it and we allow to add redundant keys
1920 * into this table.
1921 */
1922 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
1923
fb3a43a9 1924 rcu_read_unlock();
e316aad5 1925
e316aad5
DG
1926 pthread_mutex_unlock(&consumer_data.lock);
1927 return ret;
fb3a43a9
DG
1928}
1929
8994307f
DG
1930/*
1931 * Delete data stream that are flagged for deletion (endpoint_status).
1932 */
1933static void validate_endpoint_status_data_stream(void)
1934{
1935 struct lttng_ht_iter iter;
1936 struct lttng_consumer_stream *stream;
1937
1938 DBG("Consumer delete flagged data stream");
1939
1940 rcu_read_lock();
1941 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
1942 /* Validate delete flag of the stream */
9617607b 1943 if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
8994307f
DG
1944 continue;
1945 }
1946 /* Delete it right now */
1947 consumer_del_stream(stream, data_ht);
1948 }
1949 rcu_read_unlock();
1950}
1951
1952/*
1953 * Delete metadata stream that are flagged for deletion (endpoint_status).
1954 */
1955static void validate_endpoint_status_metadata_stream(
1956 struct lttng_poll_event *pollset)
1957{
1958 struct lttng_ht_iter iter;
1959 struct lttng_consumer_stream *stream;
1960
1961 DBG("Consumer delete flagged metadata stream");
1962
1963 assert(pollset);
1964
1965 rcu_read_lock();
1966 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
1967 /* Validate delete flag of the stream */
1968 if (!stream->endpoint_status) {
1969 continue;
1970 }
1971 /*
1972 * Remove from pollset so the metadata thread can continue without
1973 * blocking on a deleted stream.
1974 */
1975 lttng_poll_del(pollset, stream->wait_fd);
1976
1977 /* Delete it right now */
1978 consumer_del_metadata_stream(stream, metadata_ht);
1979 }
1980 rcu_read_unlock();
1981}
1982
fb3a43a9
DG
1983/*
1984 * Thread polls on metadata file descriptor and write them on disk or on the
1985 * network.
1986 */
7d980def 1987void *consumer_thread_metadata_poll(void *data)
fb3a43a9
DG
1988{
1989 int ret, i, pollfd;
1990 uint32_t revents, nb_fd;
e316aad5 1991 struct lttng_consumer_stream *stream = NULL;
fb3a43a9
DG
1992 struct lttng_ht_iter iter;
1993 struct lttng_ht_node_ulong *node;
fb3a43a9
DG
1994 struct lttng_poll_event events;
1995 struct lttng_consumer_local_data *ctx = data;
1996 ssize_t len;
1997
1998 rcu_register_thread();
1999
2000 DBG("Thread metadata poll started");
2001
fb3a43a9
DG
2002 /* Size is set to 1 for the consumer_metadata pipe */
2003 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2004 if (ret < 0) {
2005 ERR("Poll set creation failed");
2006 goto end;
2007 }
2008
2009 ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
2010 if (ret < 0) {
2011 goto end;
2012 }
2013
2014 /* Main loop */
2015 DBG("Metadata main loop started");
2016
2017 while (1) {
2018 lttng_poll_reset(&events);
2019
2020 nb_fd = LTTNG_POLL_GETNB(&events);
2021
2022 /* Only the metadata pipe is set */
2023 if (nb_fd == 0 && consumer_quit == 1) {
2024 goto end;
2025 }
2026
2027restart:
2028 DBG("Metadata poll wait with %d fd(s)", nb_fd);
2029 ret = lttng_poll_wait(&events, -1);
2030 DBG("Metadata event catched in thread");
2031 if (ret < 0) {
2032 if (errno == EINTR) {
e316aad5 2033 ERR("Poll EINTR catched");
fb3a43a9
DG
2034 goto restart;
2035 }
2036 goto error;
2037 }
2038
e316aad5 2039 /* From here, the event is a metadata wait fd */
fb3a43a9
DG
2040 for (i = 0; i < nb_fd; i++) {
2041 revents = LTTNG_POLL_GETEV(&events, i);
2042 pollfd = LTTNG_POLL_GETFD(&events, i);
2043
e316aad5
DG
2044 /* Just don't waste time if no returned events for the fd */
2045 if (!revents) {
2046 continue;
2047 }
2048
fb3a43a9 2049 if (pollfd == ctx->consumer_metadata_pipe[0]) {
4adabd61 2050 if (revents & (LPOLLERR | LPOLLHUP )) {
fb3a43a9
DG
2051 DBG("Metadata thread pipe hung up");
2052 /*
2053 * Remove the pipe from the poll set and continue the loop
2054 * since their might be data to consume.
2055 */
2056 lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
2057 close(ctx->consumer_metadata_pipe[0]);
2058 continue;
2059 } else if (revents & LPOLLIN) {
fb3a43a9 2060 do {
633d0084
DG
2061 /* Get the stream pointer received */
2062 ret = read(pollfd, &stream, sizeof(stream));
fb3a43a9 2063 } while (ret < 0 && errno == EINTR);
633d0084
DG
2064 if (ret < 0 ||
2065 ret < sizeof(struct lttng_consumer_stream *)) {
fb3a43a9 2066 PERROR("read metadata stream");
fb3a43a9
DG
2067 /*
2068 * Let's continue here and hope we can still work
2069 * without stopping the consumer. XXX: Should we?
2070 */
2071 continue;
2072 }
2073
8994307f
DG
2074 /* A NULL stream means that the state has changed. */
2075 if (stream == NULL) {
2076 /* Check for deleted streams. */
2077 validate_endpoint_status_metadata_stream(&events);
2078 continue;
2079 }
2080
fb3a43a9
DG
2081 DBG("Adding metadata stream %d to poll set",
2082 stream->wait_fd);
2083
e316aad5
DG
2084 ret = consumer_add_metadata_stream(stream, metadata_ht);
2085 if (ret) {
2086 ERR("Unable to add metadata stream");
2087 /* Stream was not setup properly. Continuing. */
2088 consumer_del_metadata_stream(stream, NULL);
2089 continue;
2090 }
fb3a43a9
DG
2091
2092 /* Add metadata stream to the global poll events list */
2093 lttng_poll_add(&events, stream->wait_fd,
2094 LPOLLIN | LPOLLPRI);
fb3a43a9
DG
2095 }
2096
e316aad5 2097 /* Handle other stream */
fb3a43a9
DG
2098 continue;
2099 }
2100
d09e1200 2101 rcu_read_lock();
fb3a43a9
DG
2102 lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
2103 &iter);
2104 node = lttng_ht_iter_get_node_ulong(&iter);
e316aad5 2105 assert(node);
fb3a43a9
DG
2106
2107 stream = caa_container_of(node, struct lttng_consumer_stream,
58b1f425 2108 node);
fb3a43a9 2109
e316aad5 2110 /* Check for error event */
4adabd61 2111 if (revents & (LPOLLERR | LPOLLHUP)) {
e316aad5 2112 DBG("Metadata fd %d is hup|err.", pollfd);
fb3a43a9
DG
2113 if (!stream->hangup_flush_done
2114 && (consumer_data.type == LTTNG_CONSUMER32_UST
2115 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2116 DBG("Attempting to flush and consume the UST buffers");
2117 lttng_ustconsumer_on_stream_hangup(stream);
2118
2119 /* We just flushed the stream now read it. */
4bb94b75
DG
2120 do {
2121 len = ctx->on_buffer_ready(stream, ctx);
2122 /*
2123 * We don't check the return value here since if we get
2124 * a negative len, it means an error occured thus we
2125 * simply remove it from the poll set and free the
2126 * stream.
2127 */
2128 } while (len > 0);
fb3a43a9
DG
2129 }
2130
fb3a43a9 2131 lttng_poll_del(&events, stream->wait_fd);
e316aad5
DG
2132 /*
2133 * This call update the channel states, closes file descriptors
2134 * and securely free the stream.
2135 */
2136 consumer_del_metadata_stream(stream, metadata_ht);
2137 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2138 /* Get the data out of the metadata file descriptor */
2139 DBG("Metadata available on fd %d", pollfd);
2140 assert(stream->wait_fd == pollfd);
2141
2142 len = ctx->on_buffer_ready(stream, ctx);
2143 /* It's ok to have an unavailable sub-buffer */
b64403e3 2144 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2145 /* Clean up stream from consumer and free it. */
2146 lttng_poll_del(&events, stream->wait_fd);
2147 consumer_del_metadata_stream(stream, metadata_ht);
e316aad5
DG
2148 } else if (len > 0) {
2149 stream->data_read = 1;
2150 }
fb3a43a9 2151 }
e316aad5
DG
2152
2153 /* Release RCU lock for the stream looked up */
d09e1200 2154 rcu_read_unlock();
fb3a43a9
DG
2155 }
2156 }
2157
2158error:
2159end:
2160 DBG("Metadata poll thread exiting");
2161 lttng_poll_clean(&events);
2162
2163 if (metadata_ht) {
2164 destroy_stream_ht(metadata_ht);
2165 }
2166
2167 rcu_unregister_thread();
2168 return NULL;
2169}
2170
3bd1e081 2171/*
e4421fec 2172 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
2173 * it to tracefile if necessary.
2174 */
7d980def 2175void *consumer_thread_data_poll(void *data)
3bd1e081
MD
2176{
2177 int num_rdy, num_hup, high_prio, ret, i;
2178 struct pollfd *pollfd = NULL;
2179 /* local view of the streams */
c869f647 2180 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
3bd1e081
MD
2181 /* local view of consumer_data.fds_count */
2182 int nb_fd = 0;
3bd1e081 2183 struct lttng_consumer_local_data *ctx = data;
00e2e675 2184 ssize_t len;
3bd1e081 2185
e7b994a3
DG
2186 rcu_register_thread();
2187
43c34bc3
DG
2188 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2189 if (data_ht == NULL) {
2190 goto end;
2191 }
2192
effcf122 2193 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
2194
2195 while (1) {
2196 high_prio = 0;
2197 num_hup = 0;
2198
2199 /*
e4421fec 2200 * the fds set has been updated, we need to update our
3bd1e081
MD
2201 * local array as well
2202 */
2203 pthread_mutex_lock(&consumer_data.lock);
2204 if (consumer_data.need_update) {
2205 if (pollfd != NULL) {
2206 free(pollfd);
2207 pollfd = NULL;
2208 }
2209 if (local_stream != NULL) {
2210 free(local_stream);
2211 local_stream = NULL;
2212 }
2213
50f8ae69 2214 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2215 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081 2216 if (pollfd == NULL) {
7a57cf92 2217 PERROR("pollfd malloc");
3bd1e081
MD
2218 pthread_mutex_unlock(&consumer_data.lock);
2219 goto end;
2220 }
2221
50f8ae69 2222 /* allocate for all fds + 1 for the consumer_data_pipe */
effcf122 2223 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
2224 sizeof(struct lttng_consumer_stream));
2225 if (local_stream == NULL) {
7a57cf92 2226 PERROR("local_stream malloc");
3bd1e081
MD
2227 pthread_mutex_unlock(&consumer_data.lock);
2228 goto end;
2229 }
43c34bc3
DG
2230 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
2231 data_ht);
3bd1e081
MD
2232 if (ret < 0) {
2233 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 2234 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2235 pthread_mutex_unlock(&consumer_data.lock);
2236 goto end;
2237 }
2238 nb_fd = ret;
2239 consumer_data.need_update = 0;
2240 }
2241 pthread_mutex_unlock(&consumer_data.lock);
2242
4078b776
MD
2243 /* No FDs and consumer_quit, consumer_cleanup the thread */
2244 if (nb_fd == 0 && consumer_quit == 1) {
2245 goto end;
2246 }
3bd1e081 2247 /* poll on the array of fds */
88f2b785 2248 restart:
3bd1e081 2249 DBG("polling on %d fd", nb_fd + 1);
cb365c03 2250 num_rdy = poll(pollfd, nb_fd + 1, -1);
3bd1e081
MD
2251 DBG("poll num_rdy : %d", num_rdy);
2252 if (num_rdy == -1) {
88f2b785
MD
2253 /*
2254 * Restart interrupted system call.
2255 */
2256 if (errno == EINTR) {
2257 goto restart;
2258 }
7a57cf92 2259 PERROR("Poll error");
f73fabfd 2260 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
2261 goto end;
2262 } else if (num_rdy == 0) {
2263 DBG("Polling thread timed out");
2264 goto end;
2265 }
2266
3bd1e081 2267 /*
50f8ae69 2268 * If the consumer_data_pipe triggered poll go directly to the
00e2e675
DG
2269 * beginning of the loop to update the array. We want to prioritize
2270 * array update over low-priority reads.
3bd1e081 2271 */
509bb1cf 2272 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
04fdd819 2273 size_t pipe_readlen;
04fdd819 2274
50f8ae69 2275 DBG("consumer_data_pipe wake up");
04fdd819
MD
2276 /* Consume 1 byte of pipe data */
2277 do {
50f8ae69 2278 pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
c869f647 2279 sizeof(new_stream));
04fdd819 2280 } while (pipe_readlen == -1 && errno == EINTR);
c869f647
DG
2281
2282 /*
2283 * If the stream is NULL, just ignore it. It's also possible that
2284 * the sessiond poll thread changed the consumer_quit state and is
2285 * waking us up to test it.
2286 */
2287 if (new_stream == NULL) {
8994307f 2288 validate_endpoint_status_data_stream();
c869f647
DG
2289 continue;
2290 }
2291
43c34bc3 2292 ret = consumer_add_stream(new_stream, data_ht);
c869f647
DG
2293 if (ret) {
2294 ERR("Consumer add stream %d failed. Continuing",
2295 new_stream->key);
2296 /*
2297 * At this point, if the add_stream fails, it is not in the
2298 * hash table thus passing the NULL value here.
2299 */
2300 consumer_del_stream(new_stream, NULL);
2301 }
2302
2303 /* Continue to update the local streams and handle prio ones */
3bd1e081
MD
2304 continue;
2305 }
2306
2307 /* Take care of high priority channels first. */
2308 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2309 if (local_stream[i] == NULL) {
2310 continue;
2311 }
fb3a43a9 2312 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
2313 DBG("Urgent read on fd %d", pollfd[i].fd);
2314 high_prio = 1;
4078b776 2315 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 2316 /* it's ok to have an unavailable sub-buffer */
b64403e3 2317 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2318 /* Clean the stream and free it. */
2319 consumer_del_stream(local_stream[i], data_ht);
9617607b 2320 local_stream[i] = NULL;
4078b776
MD
2321 } else if (len > 0) {
2322 local_stream[i]->data_read = 1;
d41f73b7 2323 }
3bd1e081
MD
2324 }
2325 }
2326
4078b776
MD
2327 /*
2328 * If we read high prio channel in this loop, try again
2329 * for more high prio data.
2330 */
2331 if (high_prio) {
3bd1e081
MD
2332 continue;
2333 }
2334
2335 /* Take care of low priority channels. */
4078b776 2336 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2337 if (local_stream[i] == NULL) {
2338 continue;
2339 }
4078b776
MD
2340 if ((pollfd[i].revents & POLLIN) ||
2341 local_stream[i]->hangup_flush_done) {
4078b776
MD
2342 DBG("Normal read on fd %d", pollfd[i].fd);
2343 len = ctx->on_buffer_ready(local_stream[i], ctx);
2344 /* it's ok to have an unavailable sub-buffer */
b64403e3 2345 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
ab1027f4
DG
2346 /* Clean the stream and free it. */
2347 consumer_del_stream(local_stream[i], data_ht);
9617607b 2348 local_stream[i] = NULL;
4078b776
MD
2349 } else if (len > 0) {
2350 local_stream[i]->data_read = 1;
2351 }
2352 }
2353 }
2354
2355 /* Handle hangup and errors */
2356 for (i = 0; i < nb_fd; i++) {
9617607b
DG
2357 if (local_stream[i] == NULL) {
2358 continue;
2359 }
4078b776
MD
2360 if (!local_stream[i]->hangup_flush_done
2361 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2362 && (consumer_data.type == LTTNG_CONSUMER32_UST
2363 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2364 DBG("fd %d is hup|err|nval. Attempting flush and read.",
9617607b 2365 pollfd[i].fd);
4078b776
MD
2366 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2367 /* Attempt read again, for the data we just flushed. */
2368 local_stream[i]->data_read = 1;
2369 }
2370 /*
2371 * If the poll flag is HUP/ERR/NVAL and we have
2372 * read no data in this pass, we can remove the
2373 * stream from its hash table.
2374 */
2375 if ((pollfd[i].revents & POLLHUP)) {
2376 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2377 if (!local_stream[i]->data_read) {
43c34bc3 2378 consumer_del_stream(local_stream[i], data_ht);
9617607b 2379 local_stream[i] = NULL;
4078b776
MD
2380 num_hup++;
2381 }
2382 } else if (pollfd[i].revents & POLLERR) {
2383 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2384 if (!local_stream[i]->data_read) {
43c34bc3 2385 consumer_del_stream(local_stream[i], data_ht);
9617607b 2386 local_stream[i] = NULL;
4078b776
MD
2387 num_hup++;
2388 }
2389 } else if (pollfd[i].revents & POLLNVAL) {
2390 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2391 if (!local_stream[i]->data_read) {
43c34bc3 2392 consumer_del_stream(local_stream[i], data_ht);
9617607b 2393 local_stream[i] = NULL;
4078b776 2394 num_hup++;
3bd1e081
MD
2395 }
2396 }
9617607b
DG
2397 if (local_stream[i] != NULL) {
2398 local_stream[i]->data_read = 0;
2399 }
3bd1e081
MD
2400 }
2401 }
2402end:
2403 DBG("polling thread exiting");
2404 if (pollfd != NULL) {
2405 free(pollfd);
2406 pollfd = NULL;
2407 }
2408 if (local_stream != NULL) {
2409 free(local_stream);
2410 local_stream = NULL;
2411 }
fb3a43a9
DG
2412
2413 /*
2414 * Close the write side of the pipe so epoll_wait() in
7d980def
DG
2415 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2416 * read side of the pipe. If we close them both, epoll_wait strangely does
2417 * not return and could create a endless wait period if the pipe is the
2418 * only tracked fd in the poll set. The thread will take care of closing
2419 * the read side.
fb3a43a9
DG
2420 */
2421 close(ctx->consumer_metadata_pipe[1]);
fb3a43a9 2422
43c34bc3
DG
2423 if (data_ht) {
2424 destroy_data_stream_ht(data_ht);
2425 }
2426
e7b994a3 2427 rcu_unregister_thread();
3bd1e081
MD
2428 return NULL;
2429}
2430
2431/*
2432 * This thread listens on the consumerd socket and receives the file
2433 * descriptors from the session daemon.
2434 */
7d980def 2435void *consumer_thread_sessiond_poll(void *data)
3bd1e081
MD
2436{
2437 int sock, client_socket, ret;
2438 /*
2439 * structure to poll for incoming data on communication socket avoids
2440 * making blocking sockets.
2441 */
2442 struct pollfd consumer_sockpoll[2];
2443 struct lttng_consumer_local_data *ctx = data;
2444
e7b994a3
DG
2445 rcu_register_thread();
2446
3bd1e081
MD
2447 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2448 unlink(ctx->consumer_command_sock_path);
2449 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2450 if (client_socket < 0) {
2451 ERR("Cannot create command socket");
2452 goto end;
2453 }
2454
2455 ret = lttcomm_listen_unix_sock(client_socket);
2456 if (ret < 0) {
2457 goto end;
2458 }
2459
32258573 2460 DBG("Sending ready command to lttng-sessiond");
f73fabfd 2461 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
2462 /* return < 0 on error, but == 0 is not fatal */
2463 if (ret < 0) {
32258573 2464 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
2465 goto end;
2466 }
2467
2468 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
2469 if (ret < 0) {
7a57cf92 2470 PERROR("fcntl O_NONBLOCK");
3bd1e081
MD
2471 goto end;
2472 }
2473
2474 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2475 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2476 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2477 consumer_sockpoll[1].fd = client_socket;
2478 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2479
2480 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2481 goto end;
2482 }
2483 DBG("Connection on client_socket");
2484
2485 /* Blocking call, waiting for transmission */
2486 sock = lttcomm_accept_unix_sock(client_socket);
2487 if (sock <= 0) {
2488 WARN("On accept");
2489 goto end;
2490 }
2491 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
2492 if (ret < 0) {
7a57cf92 2493 PERROR("fcntl O_NONBLOCK");
3bd1e081
MD
2494 goto end;
2495 }
2496
2497 /* update the polling structure to poll on the established socket */
2498 consumer_sockpoll[1].fd = sock;
2499 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2500
2501 while (1) {
2502 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2503 goto end;
2504 }
2505 DBG("Incoming command on sock");
2506 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2507 if (ret == -ENOENT) {
2508 DBG("Received STOP command");
2509 goto end;
2510 }
4cbc1a04
DG
2511 if (ret <= 0) {
2512 /*
2513 * This could simply be a session daemon quitting. Don't output
2514 * ERR() here.
2515 */
2516 DBG("Communication interrupted on command socket");
3bd1e081
MD
2517 goto end;
2518 }
2519 if (consumer_quit) {
2520 DBG("consumer_thread_receive_fds received quit from signal");
2521 goto end;
2522 }
2523 DBG("received fds on sock");
2524 }
2525end:
2526 DBG("consumer_thread_receive_fds exiting");
2527
2528 /*
2529 * when all fds have hung up, the polling thread
2530 * can exit cleanly
2531 */
2532 consumer_quit = 1;
2533
04fdd819 2534 /*
c869f647 2535 * Notify the data poll thread to poll back again and test the
8994307f 2536 * consumer_quit state that we just set so to quit gracefully.
04fdd819 2537 */
8994307f 2538 notify_thread_pipe(ctx->consumer_data_pipe[1]);
c869f647 2539
e7b994a3 2540 rcu_unregister_thread();
3bd1e081
MD
2541 return NULL;
2542}
d41f73b7 2543
4078b776 2544ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
2545 struct lttng_consumer_local_data *ctx)
2546{
2547 switch (consumer_data.type) {
2548 case LTTNG_CONSUMER_KERNEL:
2549 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
2550 case LTTNG_CONSUMER32_UST:
2551 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
2552 return lttng_ustconsumer_read_subbuffer(stream, ctx);
2553 default:
2554 ERR("Unknown consumer_data type");
2555 assert(0);
2556 return -ENOSYS;
2557 }
2558}
2559
2560int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2561{
2562 switch (consumer_data.type) {
2563 case LTTNG_CONSUMER_KERNEL:
2564 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
2565 case LTTNG_CONSUMER32_UST:
2566 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
2567 return lttng_ustconsumer_on_recv_stream(stream);
2568 default:
2569 ERR("Unknown consumer_data type");
2570 assert(0);
2571 return -ENOSYS;
2572 }
2573}
e4421fec
DG
2574
2575/*
2576 * Allocate and set consumer data hash tables.
2577 */
2578void lttng_consumer_init(void)
2579{
e4421fec 2580 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675 2581 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
53632229 2582 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
43c34bc3
DG
2583
2584 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2585 assert(metadata_ht);
2586 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2587 assert(data_ht);
e4421fec 2588}
7735ef9e
DG
2589
2590/*
2591 * Process the ADD_RELAYD command receive by a consumer.
2592 *
2593 * This will create a relayd socket pair and add it to the relayd hash table.
2594 * The caller MUST acquire a RCU read side lock before calling it.
2595 */
2596int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2597 struct lttng_consumer_local_data *ctx, int sock,
2598 struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
2599{
2600 int fd, ret = -1;
2601 struct consumer_relayd_sock_pair *relayd;
2602
2603 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2604
2605 /* Get relayd reference if exists. */
2606 relayd = consumer_find_relayd(net_seq_idx);
2607 if (relayd == NULL) {
2608 /* Not found. Allocate one. */
2609 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2610 if (relayd == NULL) {
f73fabfd 2611 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
7735ef9e
DG
2612 goto error;
2613 }
2614 }
2615
2616 /* Poll on consumer socket. */
2617 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2618 ret = -EINTR;
2619 goto error;
2620 }
2621
2622 /* Get relayd socket from session daemon */
2623 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
2624 if (ret != sizeof(fd)) {
f73fabfd 2625 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
7735ef9e
DG
2626 ret = -1;
2627 goto error;
2628 }
2629
2630 /* Copy socket information and received FD */
2631 switch (sock_type) {
2632 case LTTNG_STREAM_CONTROL:
2633 /* Copy received lttcomm socket */
2634 lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
2635 ret = lttcomm_create_sock(&relayd->control_sock);
2636 if (ret < 0) {
2637 goto error;
2638 }
2639
2640 /* Close the created socket fd which is useless */
2641 close(relayd->control_sock.fd);
2642
2643 /* Assign new file descriptor */
2644 relayd->control_sock.fd = fd;
2645 break;
2646 case LTTNG_STREAM_DATA:
2647 /* Copy received lttcomm socket */
2648 lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
2649 ret = lttcomm_create_sock(&relayd->data_sock);
2650 if (ret < 0) {
2651 goto error;
2652 }
2653
2654 /* Close the created socket fd which is useless */
2655 close(relayd->data_sock.fd);
2656
2657 /* Assign new file descriptor */
2658 relayd->data_sock.fd = fd;
2659 break;
2660 default:
2661 ERR("Unknown relayd socket type (%d)", sock_type);
2662 goto error;
2663 }
2664
2665 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2666 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
2667 relayd->net_seq_idx, fd);
2668
2669 /*
2670 * Add relayd socket pair to consumer data hashtable. If object already
2671 * exists or on error, the function gracefully returns.
2672 */
d09e1200 2673 add_relayd(relayd);
7735ef9e
DG
2674
2675 /* All good! */
2676 ret = 0;
2677
2678error:
2679 return ret;
2680}
ca22feea 2681
4e9a4686
DG
2682/*
2683 * Try to lock the stream mutex.
2684 *
2685 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
2686 */
2687static int stream_try_lock(struct lttng_consumer_stream *stream)
2688{
2689 int ret;
2690
2691 assert(stream);
2692
2693 /*
2694 * Try to lock the stream mutex. On failure, we know that the stream is
2695 * being used else where hence there is data still being extracted.
2696 */
2697 ret = pthread_mutex_trylock(&stream->lock);
2698 if (ret) {
2699 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
2700 ret = 0;
2701 goto end;
2702 }
2703
2704 ret = 1;
2705
2706end:
2707 return ret;
2708}
2709
ca22feea
DG
2710/*
2711 * Check if for a given session id there is still data needed to be extract
2712 * from the buffers.
2713 *
6d805429 2714 * Return 1 if data is pending or else 0 meaning ready to be read.
ca22feea 2715 */
6d805429 2716int consumer_data_pending(uint64_t id)
ca22feea
DG
2717{
2718 int ret;
2719 struct lttng_ht_iter iter;
2720 struct lttng_ht *ht;
2721 struct lttng_consumer_stream *stream;
c8f59ee5 2722 struct consumer_relayd_sock_pair *relayd;
6d805429 2723 int (*data_pending)(struct lttng_consumer_stream *);
ca22feea 2724
6d805429 2725 DBG("Consumer data pending command on session id %" PRIu64, id);
ca22feea 2726
6f6eda74 2727 rcu_read_lock();
ca22feea
DG
2728 pthread_mutex_lock(&consumer_data.lock);
2729
2730 switch (consumer_data.type) {
2731 case LTTNG_CONSUMER_KERNEL:
6d805429 2732 data_pending = lttng_kconsumer_data_pending;
ca22feea
DG
2733 break;
2734 case LTTNG_CONSUMER32_UST:
2735 case LTTNG_CONSUMER64_UST:
6d805429 2736 data_pending = lttng_ustconsumer_data_pending;
ca22feea
DG
2737 break;
2738 default:
2739 ERR("Unknown consumer data type");
2740 assert(0);
2741 }
2742
2743 /* Ease our life a bit */
2744 ht = consumer_data.stream_list_ht;
2745
c8f59ee5 2746 cds_lfht_for_each_entry_duplicate(ht->ht,
b6314938 2747 ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ca22feea
DG
2748 ht->match_fct, (void *)((unsigned long) id),
2749 &iter.iter, stream, node_session_id.node) {
4e9a4686
DG
2750 /* If this call fails, the stream is being used hence data pending. */
2751 ret = stream_try_lock(stream);
2752 if (!ret) {
6d805429 2753 goto data_not_pending;
ca22feea 2754 }
ca22feea 2755
4e9a4686
DG
2756 /*
2757 * A removed node from the hash table indicates that the stream has
2758 * been deleted thus having a guarantee that the buffers are closed
2759 * on the consumer side. However, data can still be transmitted
2760 * over the network so don't skip the relayd check.
2761 */
2762 ret = cds_lfht_is_node_deleted(&stream->node.node);
2763 if (!ret) {
2764 /* Check the stream if there is data in the buffers. */
6d805429
DG
2765 ret = data_pending(stream);
2766 if (ret == 1) {
4e9a4686 2767 pthread_mutex_unlock(&stream->lock);
6d805429 2768 goto data_not_pending;
4e9a4686
DG
2769 }
2770 }
2771
2772 /* Relayd check */
c8f59ee5
DG
2773 if (stream->net_seq_idx != -1) {
2774 relayd = consumer_find_relayd(stream->net_seq_idx);
4e9a4686
DG
2775 if (!relayd) {
2776 /*
2777 * At this point, if the relayd object is not available for the
2778 * given stream, it is because the relayd is being cleaned up
2779 * so every stream associated with it (for a session id value)
2780 * are or will be marked for deletion hence no data pending.
2781 */
2782 pthread_mutex_unlock(&stream->lock);
6d805429 2783 goto data_not_pending;
4e9a4686 2784 }
c8f59ee5 2785
c8f59ee5
DG
2786 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2787 if (stream->metadata_flag) {
2788 ret = relayd_quiescent_control(&relayd->control_sock);
2789 } else {
6d805429 2790 ret = relayd_data_pending(&relayd->control_sock,
c8f59ee5
DG
2791 stream->relayd_stream_id, stream->next_net_seq_num);
2792 }
2793 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
6d805429 2794 if (ret == 1) {
4e9a4686 2795 pthread_mutex_unlock(&stream->lock);
6d805429 2796 goto data_not_pending;
c8f59ee5
DG
2797 }
2798 }
4e9a4686 2799 pthread_mutex_unlock(&stream->lock);
c8f59ee5 2800 }
ca22feea
DG
2801
2802 /*
2803 * Finding _no_ node in the hash table means that the stream(s) have been
2804 * removed thus data is guaranteed to be available for analysis from the
2805 * trace files. This is *only* true for local consumer and not network
2806 * streaming.
2807 */
2808
2809 /* Data is available to be read by a viewer. */
2810 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 2811 rcu_read_unlock();
6d805429 2812 return 0;
ca22feea 2813
6d805429 2814data_not_pending:
ca22feea
DG
2815 /* Data is still being extracted from buffers. */
2816 pthread_mutex_unlock(&consumer_data.lock);
c8f59ee5 2817 rcu_read_unlock();
6d805429 2818 return 1;
ca22feea 2819}
This page took 0.168342 seconds and 4 git commands to generate.