16b9eb64f88ae006a742ade5d3d05b162d5bb0ef
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
41
42 #include "consumer.h"
43
44 struct lttng_consumer_global_data consumer_data = {
45 .stream_count = 0,
46 .need_update = 1,
47 .type = LTTNG_CONSUMER_UNKNOWN,
48 };
49
50 /*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56 volatile int consumer_quit;
57
58 /*
59 * The following two hash tables are visible by all threads which are separated
60 * in different source files.
61 *
62 * Global hash table containing respectively metadata and data streams. The
63 * stream element in this ht should only be updated by the metadata poll thread
64 * for the metadata and the data poll thread for the data.
65 */
66 struct lttng_ht *metadata_ht;
67 struct lttng_ht *data_ht;
68
69 /*
70 * Notify a thread pipe to poll back again. This usually means that some global
71 * state has changed so we just send back the thread in a poll wait call.
72 */
73 static void notify_thread_pipe(int wpipe)
74 {
75 int ret;
76
77 do {
78 struct lttng_consumer_stream *null_stream = NULL;
79
80 ret = write(wpipe, &null_stream, sizeof(null_stream));
81 } while (ret < 0 && errno == EINTR);
82 }
83
84 /*
85 * Find a stream. The consumer_data.lock must be locked during this
86 * call.
87 */
88 static struct lttng_consumer_stream *consumer_find_stream(int key,
89 struct lttng_ht *ht)
90 {
91 struct lttng_ht_iter iter;
92 struct lttng_ht_node_ulong *node;
93 struct lttng_consumer_stream *stream = NULL;
94
95 assert(ht);
96
97 /* Negative keys are lookup failures */
98 if (key < 0) {
99 return NULL;
100 }
101
102 rcu_read_lock();
103
104 lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
105 node = lttng_ht_iter_get_node_ulong(&iter);
106 if (node != NULL) {
107 stream = caa_container_of(node, struct lttng_consumer_stream, node);
108 }
109
110 rcu_read_unlock();
111
112 return stream;
113 }
114
115 void consumer_steal_stream_key(int key, struct lttng_ht *ht)
116 {
117 struct lttng_consumer_stream *stream;
118
119 rcu_read_lock();
120 stream = consumer_find_stream(key, ht);
121 if (stream) {
122 stream->key = -1;
123 /*
124 * We don't want the lookup to match, but we still need
125 * to iterate on this stream when iterating over the hash table. Just
126 * change the node key.
127 */
128 stream->node.key = -1;
129 }
130 rcu_read_unlock();
131 }
132
133 static struct lttng_consumer_channel *consumer_find_channel(int key)
134 {
135 struct lttng_ht_iter iter;
136 struct lttng_ht_node_ulong *node;
137 struct lttng_consumer_channel *channel = NULL;
138
139 /* Negative keys are lookup failures */
140 if (key < 0) {
141 return NULL;
142 }
143
144 rcu_read_lock();
145
146 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
147 &iter);
148 node = lttng_ht_iter_get_node_ulong(&iter);
149 if (node != NULL) {
150 channel = caa_container_of(node, struct lttng_consumer_channel, node);
151 }
152
153 rcu_read_unlock();
154
155 return channel;
156 }
157
158 static void consumer_steal_channel_key(int key)
159 {
160 struct lttng_consumer_channel *channel;
161
162 rcu_read_lock();
163 channel = consumer_find_channel(key);
164 if (channel) {
165 channel->key = -1;
166 /*
167 * We don't want the lookup to match, but we still need
168 * to iterate on this channel when iterating over the hash table. Just
169 * change the node key.
170 */
171 channel->node.key = -1;
172 }
173 rcu_read_unlock();
174 }
175
176 static
177 void consumer_free_stream(struct rcu_head *head)
178 {
179 struct lttng_ht_node_ulong *node =
180 caa_container_of(head, struct lttng_ht_node_ulong, head);
181 struct lttng_consumer_stream *stream =
182 caa_container_of(node, struct lttng_consumer_stream, node);
183
184 free(stream);
185 }
186
187 /*
188 * RCU protected relayd socket pair free.
189 */
190 static void consumer_rcu_free_relayd(struct rcu_head *head)
191 {
192 struct lttng_ht_node_ulong *node =
193 caa_container_of(head, struct lttng_ht_node_ulong, head);
194 struct consumer_relayd_sock_pair *relayd =
195 caa_container_of(node, struct consumer_relayd_sock_pair, node);
196
197 /*
198 * Close all sockets. This is done in the call RCU since we don't want the
199 * socket fds to be reassigned thus potentially creating bad state of the
200 * relayd object.
201 *
202 * We do not have to lock the control socket mutex here since at this stage
203 * there is no one referencing to this relayd object.
204 */
205 (void) relayd_close(&relayd->control_sock);
206 (void) relayd_close(&relayd->data_sock);
207
208 free(relayd);
209 }
210
211 /*
212 * Destroy and free relayd socket pair object.
213 *
214 * This function MUST be called with the consumer_data lock acquired.
215 */
216 static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
217 {
218 int ret;
219 struct lttng_ht_iter iter;
220
221 if (relayd == NULL) {
222 return;
223 }
224
225 DBG("Consumer destroy and close relayd socket pair");
226
227 iter.iter.node = &relayd->node.node;
228 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
229 if (ret != 0) {
230 /* We assume the relayd is being or is destroyed */
231 return;
232 }
233
234 /* RCU free() call */
235 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
236 }
237
238 /*
239 * Update the end point status of all streams having the given network sequence
240 * index (relayd index).
241 *
242 * It's atomically set without having the stream mutex locked which is fine
243 * because we handle the write/read race with a pipe wakeup for each thread.
244 */
245 static void update_endpoint_status_by_netidx(int net_seq_idx,
246 enum consumer_endpoint_status status)
247 {
248 struct lttng_ht_iter iter;
249 struct lttng_consumer_stream *stream;
250
251 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
252
253 rcu_read_lock();
254
255 /* Let's begin with metadata */
256 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
257 if (stream->net_seq_idx == net_seq_idx) {
258 uatomic_set(&stream->endpoint_status, status);
259 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
260 }
261 }
262
263 /* Follow up by the data streams */
264 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
265 if (stream->net_seq_idx == net_seq_idx) {
266 uatomic_set(&stream->endpoint_status, status);
267 DBG("Delete flag set to data stream %d", stream->wait_fd);
268 }
269 }
270 rcu_read_unlock();
271 }
272
273 /*
274 * Cleanup a relayd object by flagging every associated streams for deletion,
275 * destroying the object meaning removing it from the relayd hash table,
276 * closing the sockets and freeing the memory in a RCU call.
277 *
278 * If a local data context is available, notify the threads that the streams'
279 * state have changed.
280 */
281 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
282 struct lttng_consumer_local_data *ctx)
283 {
284 int netidx;
285
286 assert(relayd);
287
288 DBG("Cleaning up relayd sockets");
289
290 /* Save the net sequence index before destroying the object */
291 netidx = relayd->net_seq_idx;
292
293 /*
294 * Delete the relayd from the relayd hash table, close the sockets and free
295 * the object in a RCU call.
296 */
297 destroy_relayd(relayd);
298
299 /* Set inactive endpoint to all streams */
300 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
301
302 /*
303 * With a local data context, notify the threads that the streams' state
304 * have changed. The write() action on the pipe acts as an "implicit"
305 * memory barrier ordering the updates of the end point status from the
306 * read of this status which happens AFTER receiving this notify.
307 */
308 if (ctx) {
309 notify_thread_pipe(ctx->consumer_data_pipe[1]);
310 notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
311 }
312 }
313
314 /*
315 * Flag a relayd socket pair for destruction. Destroy it if the refcount
316 * reaches zero.
317 *
318 * RCU read side lock MUST be aquired before calling this function.
319 */
320 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
321 {
322 assert(relayd);
323
324 /* Set destroy flag for this object */
325 uatomic_set(&relayd->destroy_flag, 1);
326
327 /* Destroy the relayd if refcount is 0 */
328 if (uatomic_read(&relayd->refcount) == 0) {
329 destroy_relayd(relayd);
330 }
331 }
332
333 /*
334 * Remove a stream from the global list protected by a mutex. This
335 * function is also responsible for freeing its data structures.
336 */
337 void consumer_del_stream(struct lttng_consumer_stream *stream,
338 struct lttng_ht *ht)
339 {
340 int ret;
341 struct lttng_ht_iter iter;
342 struct lttng_consumer_channel *free_chan = NULL;
343 struct consumer_relayd_sock_pair *relayd;
344
345 assert(stream);
346
347 DBG("Consumer del stream %d", stream->wait_fd);
348
349 if (ht == NULL) {
350 /* Means the stream was allocated but not successfully added */
351 goto free_stream;
352 }
353
354 pthread_mutex_lock(&stream->lock);
355 pthread_mutex_lock(&consumer_data.lock);
356
357 switch (consumer_data.type) {
358 case LTTNG_CONSUMER_KERNEL:
359 if (stream->mmap_base != NULL) {
360 ret = munmap(stream->mmap_base, stream->mmap_len);
361 if (ret != 0) {
362 PERROR("munmap");
363 }
364 }
365 break;
366 case LTTNG_CONSUMER32_UST:
367 case LTTNG_CONSUMER64_UST:
368 lttng_ustconsumer_del_stream(stream);
369 break;
370 default:
371 ERR("Unknown consumer_data type");
372 assert(0);
373 goto end;
374 }
375
376 rcu_read_lock();
377 iter.iter.node = &stream->node.node;
378 ret = lttng_ht_del(ht, &iter);
379 assert(!ret);
380
381 /* Remove node session id from the consumer_data stream ht */
382 iter.iter.node = &stream->node_session_id.node;
383 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
384 assert(!ret);
385 rcu_read_unlock();
386
387 assert(consumer_data.stream_count > 0);
388 consumer_data.stream_count--;
389
390 if (stream->out_fd >= 0) {
391 ret = close(stream->out_fd);
392 if (ret) {
393 PERROR("close");
394 }
395 }
396 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
397 ret = close(stream->wait_fd);
398 if (ret) {
399 PERROR("close");
400 }
401 }
402 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
403 ret = close(stream->shm_fd);
404 if (ret) {
405 PERROR("close");
406 }
407 }
408
409 /* Check and cleanup relayd */
410 rcu_read_lock();
411 relayd = consumer_find_relayd(stream->net_seq_idx);
412 if (relayd != NULL) {
413 uatomic_dec(&relayd->refcount);
414 assert(uatomic_read(&relayd->refcount) >= 0);
415
416 /* Closing streams requires to lock the control socket. */
417 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
418 ret = relayd_send_close_stream(&relayd->control_sock,
419 stream->relayd_stream_id,
420 stream->next_net_seq_num - 1);
421 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
422 if (ret < 0) {
423 DBG("Unable to close stream on the relayd. Continuing");
424 /*
425 * Continue here. There is nothing we can do for the relayd.
426 * Chances are that the relayd has closed the socket so we just
427 * continue cleaning up.
428 */
429 }
430
431 /* Both conditions are met, we destroy the relayd. */
432 if (uatomic_read(&relayd->refcount) == 0 &&
433 uatomic_read(&relayd->destroy_flag)) {
434 destroy_relayd(relayd);
435 }
436 }
437 rcu_read_unlock();
438
439 uatomic_dec(&stream->chan->refcount);
440 if (!uatomic_read(&stream->chan->refcount)
441 && !uatomic_read(&stream->chan->nb_init_streams)) {
442 free_chan = stream->chan;
443 }
444
445 end:
446 consumer_data.need_update = 1;
447 pthread_mutex_unlock(&consumer_data.lock);
448 pthread_mutex_unlock(&stream->lock);
449
450 if (free_chan) {
451 consumer_del_channel(free_chan);
452 }
453
454 free_stream:
455 call_rcu(&stream->node.head, consumer_free_stream);
456 }
457
458 struct lttng_consumer_stream *consumer_allocate_stream(
459 int channel_key, int stream_key,
460 int shm_fd, int wait_fd,
461 enum lttng_consumer_stream_state state,
462 uint64_t mmap_len,
463 enum lttng_event_output output,
464 const char *path_name,
465 uid_t uid,
466 gid_t gid,
467 int net_index,
468 int metadata_flag,
469 uint64_t session_id,
470 int *alloc_ret)
471 {
472 struct lttng_consumer_stream *stream;
473
474 stream = zmalloc(sizeof(*stream));
475 if (stream == NULL) {
476 PERROR("malloc struct lttng_consumer_stream");
477 *alloc_ret = -ENOMEM;
478 goto end;
479 }
480
481 /*
482 * Get stream's channel reference. Needed when adding the stream to the
483 * global hash table.
484 */
485 stream->chan = consumer_find_channel(channel_key);
486 if (!stream->chan) {
487 *alloc_ret = -ENOENT;
488 ERR("Unable to find channel for stream %d", stream_key);
489 goto error;
490 }
491
492 stream->key = stream_key;
493 stream->shm_fd = shm_fd;
494 stream->wait_fd = wait_fd;
495 stream->out_fd = -1;
496 stream->out_fd_offset = 0;
497 stream->state = state;
498 stream->mmap_len = mmap_len;
499 stream->mmap_base = NULL;
500 stream->output = output;
501 stream->uid = uid;
502 stream->gid = gid;
503 stream->net_seq_idx = net_index;
504 stream->metadata_flag = metadata_flag;
505 stream->session_id = session_id;
506 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
507 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
508 pthread_mutex_init(&stream->lock, NULL);
509
510 /*
511 * Index differently the metadata node because the thread is using an
512 * internal hash table to match streams in the metadata_ht to the epoll set
513 * file descriptor.
514 */
515 if (metadata_flag) {
516 lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
517 } else {
518 lttng_ht_node_init_ulong(&stream->node, stream->key);
519 }
520
521 /* Init session id node with the stream session id */
522 lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
523
524 /*
525 * The cpu number is needed before using any ustctl_* actions. Ignored for
526 * the kernel so the value does not matter.
527 */
528 pthread_mutex_lock(&consumer_data.lock);
529 stream->cpu = stream->chan->cpucount++;
530 pthread_mutex_unlock(&consumer_data.lock);
531
532 DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
533 " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
534 stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
535 (unsigned long long) stream->mmap_len, stream->out_fd,
536 stream->net_seq_idx, stream->session_id);
537 return stream;
538
539 error:
540 free(stream);
541 end:
542 return NULL;
543 }
544
545 /*
546 * Add a stream to the global list protected by a mutex.
547 */
548 static int consumer_add_stream(struct lttng_consumer_stream *stream,
549 struct lttng_ht *ht)
550 {
551 int ret = 0;
552 struct consumer_relayd_sock_pair *relayd;
553
554 assert(stream);
555 assert(ht);
556
557 DBG3("Adding consumer stream %d", stream->key);
558
559 pthread_mutex_lock(&consumer_data.lock);
560 rcu_read_lock();
561
562 /* Steal stream identifier to avoid having streams with the same key */
563 consumer_steal_stream_key(stream->key, ht);
564
565 lttng_ht_add_unique_ulong(ht, &stream->node);
566
567 /*
568 * Add stream to the stream_list_ht of the consumer data. No need to steal
569 * the key since the HT does not use it and we allow to add redundant keys
570 * into this table.
571 */
572 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
573
574 /* Check and cleanup relayd */
575 relayd = consumer_find_relayd(stream->net_seq_idx);
576 if (relayd != NULL) {
577 uatomic_inc(&relayd->refcount);
578 }
579
580 /* Update channel refcount once added without error(s). */
581 uatomic_inc(&stream->chan->refcount);
582
583 /*
584 * When nb_init_streams reaches 0, we don't need to trigger any action in
585 * terms of destroying the associated channel, because the action that
586 * causes the count to become 0 also causes a stream to be added. The
587 * channel deletion will thus be triggered by the following removal of this
588 * stream.
589 */
590 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
591 uatomic_dec(&stream->chan->nb_init_streams);
592 }
593
594 /* Update consumer data once the node is inserted. */
595 consumer_data.stream_count++;
596 consumer_data.need_update = 1;
597
598 rcu_read_unlock();
599 pthread_mutex_unlock(&consumer_data.lock);
600
601 return ret;
602 }
603
604 /*
605 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
606 * be acquired before calling this.
607 */
608 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
609 {
610 int ret = 0;
611 struct lttng_ht_node_ulong *node;
612 struct lttng_ht_iter iter;
613
614 if (relayd == NULL) {
615 ret = -1;
616 goto end;
617 }
618
619 lttng_ht_lookup(consumer_data.relayd_ht,
620 (void *)((unsigned long) relayd->net_seq_idx), &iter);
621 node = lttng_ht_iter_get_node_ulong(&iter);
622 if (node != NULL) {
623 /* Relayd already exist. Ignore the insertion */
624 goto end;
625 }
626 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
627
628 end:
629 return ret;
630 }
631
632 /*
633 * Allocate and return a consumer relayd socket.
634 */
635 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
636 int net_seq_idx)
637 {
638 struct consumer_relayd_sock_pair *obj = NULL;
639
640 /* Negative net sequence index is a failure */
641 if (net_seq_idx < 0) {
642 goto error;
643 }
644
645 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
646 if (obj == NULL) {
647 PERROR("zmalloc relayd sock");
648 goto error;
649 }
650
651 obj->net_seq_idx = net_seq_idx;
652 obj->refcount = 0;
653 obj->destroy_flag = 0;
654 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
655 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
656
657 error:
658 return obj;
659 }
660
661 /*
662 * Find a relayd socket pair in the global consumer data.
663 *
664 * Return the object if found else NULL.
665 * RCU read-side lock must be held across this call and while using the
666 * returned object.
667 */
668 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
669 {
670 struct lttng_ht_iter iter;
671 struct lttng_ht_node_ulong *node;
672 struct consumer_relayd_sock_pair *relayd = NULL;
673
674 /* Negative keys are lookup failures */
675 if (key < 0) {
676 goto error;
677 }
678
679 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
680 &iter);
681 node = lttng_ht_iter_get_node_ulong(&iter);
682 if (node != NULL) {
683 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
684 }
685
686 error:
687 return relayd;
688 }
689
690 /*
691 * Handle stream for relayd transmission if the stream applies for network
692 * streaming where the net sequence index is set.
693 *
694 * Return destination file descriptor or negative value on error.
695 */
696 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
697 size_t data_size, unsigned long padding,
698 struct consumer_relayd_sock_pair *relayd)
699 {
700 int outfd = -1, ret;
701 struct lttcomm_relayd_data_hdr data_hdr;
702
703 /* Safety net */
704 assert(stream);
705 assert(relayd);
706
707 /* Reset data header */
708 memset(&data_hdr, 0, sizeof(data_hdr));
709
710 if (stream->metadata_flag) {
711 /* Caller MUST acquire the relayd control socket lock */
712 ret = relayd_send_metadata(&relayd->control_sock, data_size);
713 if (ret < 0) {
714 goto error;
715 }
716
717 /* Metadata are always sent on the control socket. */
718 outfd = relayd->control_sock.fd;
719 } else {
720 /* Set header with stream information */
721 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
722 data_hdr.data_size = htobe32(data_size);
723 data_hdr.padding_size = htobe32(padding);
724 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
725 /* Other fields are zeroed previously */
726
727 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
728 sizeof(data_hdr));
729 if (ret < 0) {
730 goto error;
731 }
732
733 /* Set to go on data socket */
734 outfd = relayd->data_sock.fd;
735 }
736
737 error:
738 return outfd;
739 }
740
741 static
742 void consumer_free_channel(struct rcu_head *head)
743 {
744 struct lttng_ht_node_ulong *node =
745 caa_container_of(head, struct lttng_ht_node_ulong, head);
746 struct lttng_consumer_channel *channel =
747 caa_container_of(node, struct lttng_consumer_channel, node);
748
749 free(channel);
750 }
751
752 /*
753 * Remove a channel from the global list protected by a mutex. This
754 * function is also responsible for freeing its data structures.
755 */
756 void consumer_del_channel(struct lttng_consumer_channel *channel)
757 {
758 int ret;
759 struct lttng_ht_iter iter;
760
761 pthread_mutex_lock(&consumer_data.lock);
762
763 switch (consumer_data.type) {
764 case LTTNG_CONSUMER_KERNEL:
765 break;
766 case LTTNG_CONSUMER32_UST:
767 case LTTNG_CONSUMER64_UST:
768 lttng_ustconsumer_del_channel(channel);
769 break;
770 default:
771 ERR("Unknown consumer_data type");
772 assert(0);
773 goto end;
774 }
775
776 rcu_read_lock();
777 iter.iter.node = &channel->node.node;
778 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
779 assert(!ret);
780 rcu_read_unlock();
781
782 if (channel->mmap_base != NULL) {
783 ret = munmap(channel->mmap_base, channel->mmap_len);
784 if (ret != 0) {
785 PERROR("munmap");
786 }
787 }
788 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
789 ret = close(channel->wait_fd);
790 if (ret) {
791 PERROR("close");
792 }
793 }
794 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
795 ret = close(channel->shm_fd);
796 if (ret) {
797 PERROR("close");
798 }
799 }
800
801 call_rcu(&channel->node.head, consumer_free_channel);
802 end:
803 pthread_mutex_unlock(&consumer_data.lock);
804 }
805
806 struct lttng_consumer_channel *consumer_allocate_channel(
807 int channel_key,
808 int shm_fd, int wait_fd,
809 uint64_t mmap_len,
810 uint64_t max_sb_size,
811 unsigned int nb_init_streams)
812 {
813 struct lttng_consumer_channel *channel;
814 int ret;
815
816 channel = zmalloc(sizeof(*channel));
817 if (channel == NULL) {
818 PERROR("malloc struct lttng_consumer_channel");
819 goto end;
820 }
821 channel->key = channel_key;
822 channel->shm_fd = shm_fd;
823 channel->wait_fd = wait_fd;
824 channel->mmap_len = mmap_len;
825 channel->max_sb_size = max_sb_size;
826 channel->refcount = 0;
827 channel->nb_init_streams = nb_init_streams;
828 lttng_ht_node_init_ulong(&channel->node, channel->key);
829
830 switch (consumer_data.type) {
831 case LTTNG_CONSUMER_KERNEL:
832 channel->mmap_base = NULL;
833 channel->mmap_len = 0;
834 break;
835 case LTTNG_CONSUMER32_UST:
836 case LTTNG_CONSUMER64_UST:
837 ret = lttng_ustconsumer_allocate_channel(channel);
838 if (ret) {
839 free(channel);
840 return NULL;
841 }
842 break;
843 default:
844 ERR("Unknown consumer_data type");
845 assert(0);
846 goto end;
847 }
848 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
849 channel->key, channel->shm_fd, channel->wait_fd,
850 (unsigned long long) channel->mmap_len,
851 (unsigned long long) channel->max_sb_size);
852 end:
853 return channel;
854 }
855
856 /*
857 * Add a channel to the global list protected by a mutex.
858 */
859 int consumer_add_channel(struct lttng_consumer_channel *channel)
860 {
861 struct lttng_ht_node_ulong *node;
862 struct lttng_ht_iter iter;
863
864 pthread_mutex_lock(&consumer_data.lock);
865 /* Steal channel identifier, for UST */
866 consumer_steal_channel_key(channel->key);
867 rcu_read_lock();
868
869 lttng_ht_lookup(consumer_data.channel_ht,
870 (void *)((unsigned long) channel->key), &iter);
871 node = lttng_ht_iter_get_node_ulong(&iter);
872 if (node != NULL) {
873 /* Channel already exist. Ignore the insertion */
874 goto end;
875 }
876
877 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
878
879 end:
880 rcu_read_unlock();
881 pthread_mutex_unlock(&consumer_data.lock);
882
883 return 0;
884 }
885
886 /*
887 * Allocate the pollfd structure and the local view of the out fds to avoid
888 * doing a lookup in the linked list and concurrency issues when writing is
889 * needed. Called with consumer_data.lock held.
890 *
891 * Returns the number of fds in the structures.
892 */
893 static int consumer_update_poll_array(
894 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
895 struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
896 {
897 int i = 0;
898 struct lttng_ht_iter iter;
899 struct lttng_consumer_stream *stream;
900
901 DBG("Updating poll fd array");
902 rcu_read_lock();
903 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
904 /*
905 * Only active streams with an active end point can be added to the
906 * poll set and local stream storage of the thread.
907 *
908 * There is a potential race here for endpoint_status to be updated
909 * just after the check. However, this is OK since the stream(s) will
910 * be deleted once the thread is notified that the end point state has
911 * changed where this function will be called back again.
912 */
913 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
914 stream->endpoint_status) {
915 continue;
916 }
917 DBG("Active FD %d", stream->wait_fd);
918 (*pollfd)[i].fd = stream->wait_fd;
919 (*pollfd)[i].events = POLLIN | POLLPRI;
920 local_stream[i] = stream;
921 i++;
922 }
923 rcu_read_unlock();
924
925 /*
926 * Insert the consumer_data_pipe at the end of the array and don't
927 * increment i so nb_fd is the number of real FD.
928 */
929 (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
930 (*pollfd)[i].events = POLLIN | POLLPRI;
931 return i;
932 }
933
934 /*
935 * Poll on the should_quit pipe and the command socket return -1 on error and
936 * should exit, 0 if data is available on the command socket
937 */
938 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
939 {
940 int num_rdy;
941
942 restart:
943 num_rdy = poll(consumer_sockpoll, 2, -1);
944 if (num_rdy == -1) {
945 /*
946 * Restart interrupted system call.
947 */
948 if (errno == EINTR) {
949 goto restart;
950 }
951 PERROR("Poll error");
952 goto exit;
953 }
954 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
955 DBG("consumer_should_quit wake up");
956 goto exit;
957 }
958 return 0;
959
960 exit:
961 return -1;
962 }
963
964 /*
965 * Set the error socket.
966 */
967 void lttng_consumer_set_error_sock(
968 struct lttng_consumer_local_data *ctx, int sock)
969 {
970 ctx->consumer_error_socket = sock;
971 }
972
973 /*
974 * Set the command socket path.
975 */
976 void lttng_consumer_set_command_sock_path(
977 struct lttng_consumer_local_data *ctx, char *sock)
978 {
979 ctx->consumer_command_sock_path = sock;
980 }
981
982 /*
983 * Send return code to the session daemon.
984 * If the socket is not defined, we return 0, it is not a fatal error
985 */
986 int lttng_consumer_send_error(
987 struct lttng_consumer_local_data *ctx, int cmd)
988 {
989 if (ctx->consumer_error_socket > 0) {
990 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
991 sizeof(enum lttcomm_sessiond_command));
992 }
993
994 return 0;
995 }
996
997 /*
998 * Close all the tracefiles and stream fds, should be called when all instances
999 * are destroyed.
1000 */
1001 void lttng_consumer_cleanup(void)
1002 {
1003 struct lttng_ht_iter iter;
1004 struct lttng_ht_node_ulong *node;
1005
1006 rcu_read_lock();
1007
1008 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
1009 node) {
1010 struct lttng_consumer_channel *channel =
1011 caa_container_of(node, struct lttng_consumer_channel, node);
1012 consumer_del_channel(channel);
1013 }
1014
1015 rcu_read_unlock();
1016
1017 lttng_ht_destroy(consumer_data.channel_ht);
1018 }
1019
1020 /*
1021 * Called from signal handler.
1022 */
1023 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1024 {
1025 int ret;
1026 consumer_quit = 1;
1027 do {
1028 ret = write(ctx->consumer_should_quit[1], "4", 1);
1029 } while (ret < 0 && errno == EINTR);
1030 if (ret < 0) {
1031 PERROR("write consumer quit");
1032 }
1033
1034 DBG("Consumer flag that it should quit");
1035 }
1036
1037 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1038 off_t orig_offset)
1039 {
1040 int outfd = stream->out_fd;
1041
1042 /*
1043 * This does a blocking write-and-wait on any page that belongs to the
1044 * subbuffer prior to the one we just wrote.
1045 * Don't care about error values, as these are just hints and ways to
1046 * limit the amount of page cache used.
1047 */
1048 if (orig_offset < stream->chan->max_sb_size) {
1049 return;
1050 }
1051 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
1052 stream->chan->max_sb_size,
1053 SYNC_FILE_RANGE_WAIT_BEFORE
1054 | SYNC_FILE_RANGE_WRITE
1055 | SYNC_FILE_RANGE_WAIT_AFTER);
1056 /*
1057 * Give hints to the kernel about how we access the file:
1058 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1059 * we write it.
1060 *
1061 * We need to call fadvise again after the file grows because the
1062 * kernel does not seem to apply fadvise to non-existing parts of the
1063 * file.
1064 *
1065 * Call fadvise _after_ having waited for the page writeback to
1066 * complete because the dirty page writeback semantic is not well
1067 * defined. So it can be expected to lead to lower throughput in
1068 * streaming.
1069 */
1070 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
1071 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
1072 }
1073
1074 /*
1075 * Initialise the necessary environnement :
1076 * - create a new context
1077 * - create the poll_pipe
1078 * - create the should_quit pipe (for signal handler)
1079 * - create the thread pipe (for splice)
1080 *
1081 * Takes a function pointer as argument, this function is called when data is
1082 * available on a buffer. This function is responsible to do the
1083 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1084 * buffer configuration and then kernctl_put_next_subbuf at the end.
1085 *
1086 * Returns a pointer to the new context or NULL on error.
1087 */
1088 struct lttng_consumer_local_data *lttng_consumer_create(
1089 enum lttng_consumer_type type,
1090 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1091 struct lttng_consumer_local_data *ctx),
1092 int (*recv_channel)(struct lttng_consumer_channel *channel),
1093 int (*recv_stream)(struct lttng_consumer_stream *stream),
1094 int (*update_stream)(int stream_key, uint32_t state))
1095 {
1096 int ret, i;
1097 struct lttng_consumer_local_data *ctx;
1098
1099 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1100 consumer_data.type == type);
1101 consumer_data.type = type;
1102
1103 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1104 if (ctx == NULL) {
1105 PERROR("allocating context");
1106 goto error;
1107 }
1108
1109 ctx->consumer_error_socket = -1;
1110 /* assign the callbacks */
1111 ctx->on_buffer_ready = buffer_ready;
1112 ctx->on_recv_channel = recv_channel;
1113 ctx->on_recv_stream = recv_stream;
1114 ctx->on_update_stream = update_stream;
1115
1116 ret = pipe(ctx->consumer_data_pipe);
1117 if (ret < 0) {
1118 PERROR("Error creating poll pipe");
1119 goto error_poll_pipe;
1120 }
1121
1122 /* set read end of the pipe to non-blocking */
1123 ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
1124 if (ret < 0) {
1125 PERROR("fcntl O_NONBLOCK");
1126 goto error_poll_fcntl;
1127 }
1128
1129 /* set write end of the pipe to non-blocking */
1130 ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
1131 if (ret < 0) {
1132 PERROR("fcntl O_NONBLOCK");
1133 goto error_poll_fcntl;
1134 }
1135
1136 ret = pipe(ctx->consumer_should_quit);
1137 if (ret < 0) {
1138 PERROR("Error creating recv pipe");
1139 goto error_quit_pipe;
1140 }
1141
1142 ret = pipe(ctx->consumer_thread_pipe);
1143 if (ret < 0) {
1144 PERROR("Error creating thread pipe");
1145 goto error_thread_pipe;
1146 }
1147
1148 ret = utils_create_pipe(ctx->consumer_metadata_pipe);
1149 if (ret < 0) {
1150 goto error_metadata_pipe;
1151 }
1152
1153 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1154 if (ret < 0) {
1155 goto error_splice_pipe;
1156 }
1157
1158 return ctx;
1159
1160 error_splice_pipe:
1161 utils_close_pipe(ctx->consumer_metadata_pipe);
1162 error_metadata_pipe:
1163 utils_close_pipe(ctx->consumer_thread_pipe);
1164 error_thread_pipe:
1165 for (i = 0; i < 2; i++) {
1166 int err;
1167
1168 err = close(ctx->consumer_should_quit[i]);
1169 if (err) {
1170 PERROR("close");
1171 }
1172 }
1173 error_poll_fcntl:
1174 error_quit_pipe:
1175 for (i = 0; i < 2; i++) {
1176 int err;
1177
1178 err = close(ctx->consumer_data_pipe[i]);
1179 if (err) {
1180 PERROR("close");
1181 }
1182 }
1183 error_poll_pipe:
1184 free(ctx);
1185 error:
1186 return NULL;
1187 }
1188
1189 /*
1190 * Close all fds associated with the instance and free the context.
1191 */
1192 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1193 {
1194 int ret;
1195
1196 DBG("Consumer destroying it. Closing everything.");
1197
1198 ret = close(ctx->consumer_error_socket);
1199 if (ret) {
1200 PERROR("close");
1201 }
1202 ret = close(ctx->consumer_thread_pipe[0]);
1203 if (ret) {
1204 PERROR("close");
1205 }
1206 ret = close(ctx->consumer_thread_pipe[1]);
1207 if (ret) {
1208 PERROR("close");
1209 }
1210 ret = close(ctx->consumer_data_pipe[0]);
1211 if (ret) {
1212 PERROR("close");
1213 }
1214 ret = close(ctx->consumer_data_pipe[1]);
1215 if (ret) {
1216 PERROR("close");
1217 }
1218 ret = close(ctx->consumer_should_quit[0]);
1219 if (ret) {
1220 PERROR("close");
1221 }
1222 ret = close(ctx->consumer_should_quit[1]);
1223 if (ret) {
1224 PERROR("close");
1225 }
1226 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1227
1228 unlink(ctx->consumer_command_sock_path);
1229 free(ctx);
1230 }
1231
1232 /*
1233 * Write the metadata stream id on the specified file descriptor.
1234 */
1235 static int write_relayd_metadata_id(int fd,
1236 struct lttng_consumer_stream *stream,
1237 struct consumer_relayd_sock_pair *relayd,
1238 unsigned long padding)
1239 {
1240 int ret;
1241 struct lttcomm_relayd_metadata_payload hdr;
1242
1243 hdr.stream_id = htobe64(stream->relayd_stream_id);
1244 hdr.padding_size = htobe32(padding);
1245 do {
1246 ret = write(fd, (void *) &hdr, sizeof(hdr));
1247 } while (ret < 0 && errno == EINTR);
1248 if (ret < 0) {
1249 PERROR("write metadata stream id");
1250 goto end;
1251 }
1252 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1253 stream->relayd_stream_id, padding);
1254
1255 end:
1256 return ret;
1257 }
1258
1259 /*
1260 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1261 * core function for writing trace buffers to either the local filesystem or
1262 * the network.
1263 *
1264 * Careful review MUST be put if any changes occur!
1265 *
1266 * Returns the number of bytes written
1267 */
1268 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1269 struct lttng_consumer_local_data *ctx,
1270 struct lttng_consumer_stream *stream, unsigned long len,
1271 unsigned long padding)
1272 {
1273 unsigned long mmap_offset;
1274 ssize_t ret = 0, written = 0;
1275 off_t orig_offset = stream->out_fd_offset;
1276 /* Default is on the disk */
1277 int outfd = stream->out_fd;
1278 struct consumer_relayd_sock_pair *relayd = NULL;
1279 unsigned int relayd_hang_up = 0;
1280
1281 /* RCU lock for the relayd pointer */
1282 rcu_read_lock();
1283
1284 pthread_mutex_lock(&stream->lock);
1285
1286 /* Flag that the current stream if set for network streaming. */
1287 if (stream->net_seq_idx != -1) {
1288 relayd = consumer_find_relayd(stream->net_seq_idx);
1289 if (relayd == NULL) {
1290 goto end;
1291 }
1292 }
1293
1294 /* get the offset inside the fd to mmap */
1295 switch (consumer_data.type) {
1296 case LTTNG_CONSUMER_KERNEL:
1297 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1298 break;
1299 case LTTNG_CONSUMER32_UST:
1300 case LTTNG_CONSUMER64_UST:
1301 ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
1302 stream->buf, &mmap_offset);
1303 break;
1304 default:
1305 ERR("Unknown consumer_data type");
1306 assert(0);
1307 }
1308 if (ret != 0) {
1309 errno = -ret;
1310 PERROR("tracer ctl get_mmap_read_offset");
1311 written = ret;
1312 goto end;
1313 }
1314
1315 /* Handle stream on the relayd if the output is on the network */
1316 if (relayd) {
1317 unsigned long netlen = len;
1318
1319 /*
1320 * Lock the control socket for the complete duration of the function
1321 * since from this point on we will use the socket.
1322 */
1323 if (stream->metadata_flag) {
1324 /* Metadata requires the control socket. */
1325 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1326 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1327 }
1328
1329 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1330 if (ret >= 0) {
1331 /* Use the returned socket. */
1332 outfd = ret;
1333
1334 /* Write metadata stream id before payload */
1335 if (stream->metadata_flag) {
1336 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
1337 if (ret < 0) {
1338 written = ret;
1339 /* Socket operation failed. We consider the relayd dead */
1340 if (ret == -EPIPE || ret == -EINVAL) {
1341 relayd_hang_up = 1;
1342 goto write_error;
1343 }
1344 goto end;
1345 }
1346 }
1347 } else {
1348 /* Socket operation failed. We consider the relayd dead */
1349 if (ret == -EPIPE || ret == -EINVAL) {
1350 relayd_hang_up = 1;
1351 goto write_error;
1352 }
1353 /* Else, use the default set before which is the filesystem. */
1354 }
1355 } else {
1356 /* No streaming, we have to set the len with the full padding */
1357 len += padding;
1358 }
1359
1360 while (len > 0) {
1361 do {
1362 ret = write(outfd, stream->mmap_base + mmap_offset, len);
1363 } while (ret < 0 && errno == EINTR);
1364 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1365 if (ret < 0) {
1366 PERROR("Error in file write");
1367 if (written == 0) {
1368 written = ret;
1369 }
1370 /* Socket operation failed. We consider the relayd dead */
1371 if (errno == EPIPE || errno == EINVAL) {
1372 relayd_hang_up = 1;
1373 goto write_error;
1374 }
1375 goto end;
1376 } else if (ret > len) {
1377 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
1378 written += ret;
1379 goto end;
1380 } else {
1381 len -= ret;
1382 mmap_offset += ret;
1383 }
1384
1385 /* This call is useless on a socket so better save a syscall. */
1386 if (!relayd) {
1387 /* This won't block, but will start writeout asynchronously */
1388 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1389 SYNC_FILE_RANGE_WRITE);
1390 stream->out_fd_offset += ret;
1391 }
1392 written += ret;
1393 }
1394 lttng_consumer_sync_trace_file(stream, orig_offset);
1395
1396 write_error:
1397 /*
1398 * This is a special case that the relayd has closed its socket. Let's
1399 * cleanup the relayd object and all associated streams.
1400 */
1401 if (relayd && relayd_hang_up) {
1402 cleanup_relayd(relayd, ctx);
1403 }
1404
1405 end:
1406 /* Unlock only if ctrl socket used */
1407 if (relayd && stream->metadata_flag) {
1408 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1409 }
1410 pthread_mutex_unlock(&stream->lock);
1411
1412 rcu_read_unlock();
1413 return written;
1414 }
1415
1416 /*
1417 * Splice the data from the ring buffer to the tracefile.
1418 *
1419 * Returns the number of bytes spliced.
1420 */
1421 ssize_t lttng_consumer_on_read_subbuffer_splice(
1422 struct lttng_consumer_local_data *ctx,
1423 struct lttng_consumer_stream *stream, unsigned long len,
1424 unsigned long padding)
1425 {
1426 ssize_t ret = 0, written = 0, ret_splice = 0;
1427 loff_t offset = 0;
1428 off_t orig_offset = stream->out_fd_offset;
1429 int fd = stream->wait_fd;
1430 /* Default is on the disk */
1431 int outfd = stream->out_fd;
1432 struct consumer_relayd_sock_pair *relayd = NULL;
1433 int *splice_pipe;
1434 unsigned int relayd_hang_up = 0;
1435
1436 switch (consumer_data.type) {
1437 case LTTNG_CONSUMER_KERNEL:
1438 break;
1439 case LTTNG_CONSUMER32_UST:
1440 case LTTNG_CONSUMER64_UST:
1441 /* Not supported for user space tracing */
1442 return -ENOSYS;
1443 default:
1444 ERR("Unknown consumer_data type");
1445 assert(0);
1446 }
1447
1448 /* RCU lock for the relayd pointer */
1449 rcu_read_lock();
1450
1451 pthread_mutex_lock(&stream->lock);
1452
1453 /* Flag that the current stream if set for network streaming. */
1454 if (stream->net_seq_idx != -1) {
1455 relayd = consumer_find_relayd(stream->net_seq_idx);
1456 if (relayd == NULL) {
1457 goto end;
1458 }
1459 }
1460
1461 /*
1462 * Choose right pipe for splice. Metadata and trace data are handled by
1463 * different threads hence the use of two pipes in order not to race or
1464 * corrupt the written data.
1465 */
1466 if (stream->metadata_flag) {
1467 splice_pipe = ctx->consumer_splice_metadata_pipe;
1468 } else {
1469 splice_pipe = ctx->consumer_thread_pipe;
1470 }
1471
1472 /* Write metadata stream id before payload */
1473 if (relayd) {
1474 int total_len = len;
1475
1476 if (stream->metadata_flag) {
1477 /*
1478 * Lock the control socket for the complete duration of the function
1479 * since from this point on we will use the socket.
1480 */
1481 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1482
1483 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1484 padding);
1485 if (ret < 0) {
1486 written = ret;
1487 /* Socket operation failed. We consider the relayd dead */
1488 if (ret == -EBADF) {
1489 WARN("Remote relayd disconnected. Stopping");
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
1493 goto end;
1494 }
1495
1496 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1497 }
1498
1499 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1500 if (ret >= 0) {
1501 /* Use the returned socket. */
1502 outfd = ret;
1503 } else {
1504 /* Socket operation failed. We consider the relayd dead */
1505 if (ret == -EBADF) {
1506 WARN("Remote relayd disconnected. Stopping");
1507 relayd_hang_up = 1;
1508 goto write_error;
1509 }
1510 goto end;
1511 }
1512 } else {
1513 /* No streaming, we have to set the len with the full padding */
1514 len += padding;
1515 }
1516
1517 while (len > 0) {
1518 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1519 (unsigned long)offset, len, fd, splice_pipe[1]);
1520 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1521 SPLICE_F_MOVE | SPLICE_F_MORE);
1522 DBG("splice chan to pipe, ret %zd", ret_splice);
1523 if (ret_splice < 0) {
1524 PERROR("Error in relay splice");
1525 if (written == 0) {
1526 written = ret_splice;
1527 }
1528 ret = errno;
1529 goto splice_error;
1530 }
1531
1532 /* Handle stream on the relayd if the output is on the network */
1533 if (relayd) {
1534 if (stream->metadata_flag) {
1535 size_t metadata_payload_size =
1536 sizeof(struct lttcomm_relayd_metadata_payload);
1537
1538 /* Update counter to fit the spliced data */
1539 ret_splice += metadata_payload_size;
1540 len += metadata_payload_size;
1541 /*
1542 * We do this so the return value can match the len passed as
1543 * argument to this function.
1544 */
1545 written -= metadata_payload_size;
1546 }
1547 }
1548
1549 /* Splice data out */
1550 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
1551 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1552 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
1553 if (ret_splice < 0) {
1554 PERROR("Error in file splice");
1555 if (written == 0) {
1556 written = ret_splice;
1557 }
1558 /* Socket operation failed. We consider the relayd dead */
1559 if (errno == EBADF || errno == EPIPE) {
1560 WARN("Remote relayd disconnected. Stopping");
1561 relayd_hang_up = 1;
1562 goto write_error;
1563 }
1564 ret = errno;
1565 goto splice_error;
1566 } else if (ret_splice > len) {
1567 errno = EINVAL;
1568 PERROR("Wrote more data than requested %zd (len: %lu)",
1569 ret_splice, len);
1570 written += ret_splice;
1571 ret = errno;
1572 goto splice_error;
1573 }
1574 len -= ret_splice;
1575
1576 /* This call is useless on a socket so better save a syscall. */
1577 if (!relayd) {
1578 /* This won't block, but will start writeout asynchronously */
1579 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1580 SYNC_FILE_RANGE_WRITE);
1581 stream->out_fd_offset += ret_splice;
1582 }
1583 written += ret_splice;
1584 }
1585 lttng_consumer_sync_trace_file(stream, orig_offset);
1586
1587 ret = ret_splice;
1588
1589 goto end;
1590
1591 write_error:
1592 /*
1593 * This is a special case that the relayd has closed its socket. Let's
1594 * cleanup the relayd object and all associated streams.
1595 */
1596 if (relayd && relayd_hang_up) {
1597 cleanup_relayd(relayd, ctx);
1598 /* Skip splice error so the consumer does not fail */
1599 goto end;
1600 }
1601
1602 splice_error:
1603 /* send the appropriate error description to sessiond */
1604 switch (ret) {
1605 case EINVAL:
1606 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1607 break;
1608 case ENOMEM:
1609 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1610 break;
1611 case ESPIPE:
1612 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1613 break;
1614 }
1615
1616 end:
1617 if (relayd && stream->metadata_flag) {
1618 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1619 }
1620 pthread_mutex_unlock(&stream->lock);
1621
1622 rcu_read_unlock();
1623 return written;
1624 }
1625
1626 /*
1627 * Take a snapshot for a specific fd
1628 *
1629 * Returns 0 on success, < 0 on error
1630 */
1631 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1632 struct lttng_consumer_stream *stream)
1633 {
1634 switch (consumer_data.type) {
1635 case LTTNG_CONSUMER_KERNEL:
1636 return lttng_kconsumer_take_snapshot(ctx, stream);
1637 case LTTNG_CONSUMER32_UST:
1638 case LTTNG_CONSUMER64_UST:
1639 return lttng_ustconsumer_take_snapshot(ctx, stream);
1640 default:
1641 ERR("Unknown consumer_data type");
1642 assert(0);
1643 return -ENOSYS;
1644 }
1645
1646 }
1647
1648 /*
1649 * Get the produced position
1650 *
1651 * Returns 0 on success, < 0 on error
1652 */
1653 int lttng_consumer_get_produced_snapshot(
1654 struct lttng_consumer_local_data *ctx,
1655 struct lttng_consumer_stream *stream,
1656 unsigned long *pos)
1657 {
1658 switch (consumer_data.type) {
1659 case LTTNG_CONSUMER_KERNEL:
1660 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
1661 case LTTNG_CONSUMER32_UST:
1662 case LTTNG_CONSUMER64_UST:
1663 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1664 default:
1665 ERR("Unknown consumer_data type");
1666 assert(0);
1667 return -ENOSYS;
1668 }
1669 }
1670
1671 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1672 int sock, struct pollfd *consumer_sockpoll)
1673 {
1674 switch (consumer_data.type) {
1675 case LTTNG_CONSUMER_KERNEL:
1676 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1677 case LTTNG_CONSUMER32_UST:
1678 case LTTNG_CONSUMER64_UST:
1679 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1680 default:
1681 ERR("Unknown consumer_data type");
1682 assert(0);
1683 return -ENOSYS;
1684 }
1685 }
1686
1687 /*
1688 * Iterate over all streams of the hashtable and free them properly.
1689 *
1690 * WARNING: *MUST* be used with data stream only.
1691 */
1692 static void destroy_data_stream_ht(struct lttng_ht *ht)
1693 {
1694 int ret;
1695 struct lttng_ht_iter iter;
1696 struct lttng_consumer_stream *stream;
1697
1698 if (ht == NULL) {
1699 return;
1700 }
1701
1702 rcu_read_lock();
1703 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1704 ret = lttng_ht_del(ht, &iter);
1705 assert(!ret);
1706
1707 call_rcu(&stream->node.head, consumer_free_stream);
1708 }
1709 rcu_read_unlock();
1710
1711 lttng_ht_destroy(ht);
1712 }
1713
1714 /*
1715 * Iterate over all streams of the hashtable and free them properly.
1716 *
1717 * XXX: Should not be only for metadata stream or else use an other name.
1718 */
1719 static void destroy_stream_ht(struct lttng_ht *ht)
1720 {
1721 int ret;
1722 struct lttng_ht_iter iter;
1723 struct lttng_consumer_stream *stream;
1724
1725 if (ht == NULL) {
1726 return;
1727 }
1728
1729 rcu_read_lock();
1730 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1731 ret = lttng_ht_del(ht, &iter);
1732 assert(!ret);
1733
1734 call_rcu(&stream->node.head, consumer_free_stream);
1735 }
1736 rcu_read_unlock();
1737
1738 lttng_ht_destroy(ht);
1739 }
1740
1741 /*
1742 * Clean up a metadata stream and free its memory.
1743 */
1744 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1745 struct lttng_ht *ht)
1746 {
1747 int ret;
1748 struct lttng_ht_iter iter;
1749 struct lttng_consumer_channel *free_chan = NULL;
1750 struct consumer_relayd_sock_pair *relayd;
1751
1752 assert(stream);
1753 /*
1754 * This call should NEVER receive regular stream. It must always be
1755 * metadata stream and this is crucial for data structure synchronization.
1756 */
1757 assert(stream->metadata_flag);
1758
1759 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1760
1761 if (ht == NULL) {
1762 /* Means the stream was allocated but not successfully added */
1763 goto free_stream;
1764 }
1765
1766 pthread_mutex_lock(&stream->lock);
1767
1768 pthread_mutex_lock(&consumer_data.lock);
1769 switch (consumer_data.type) {
1770 case LTTNG_CONSUMER_KERNEL:
1771 if (stream->mmap_base != NULL) {
1772 ret = munmap(stream->mmap_base, stream->mmap_len);
1773 if (ret != 0) {
1774 PERROR("munmap metadata stream");
1775 }
1776 }
1777 break;
1778 case LTTNG_CONSUMER32_UST:
1779 case LTTNG_CONSUMER64_UST:
1780 lttng_ustconsumer_del_stream(stream);
1781 break;
1782 default:
1783 ERR("Unknown consumer_data type");
1784 assert(0);
1785 goto end;
1786 }
1787
1788 rcu_read_lock();
1789 iter.iter.node = &stream->node.node;
1790 ret = lttng_ht_del(ht, &iter);
1791 assert(!ret);
1792
1793 /* Remove node session id from the consumer_data stream ht */
1794 iter.iter.node = &stream->node_session_id.node;
1795 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1796 assert(!ret);
1797 rcu_read_unlock();
1798
1799 if (stream->out_fd >= 0) {
1800 ret = close(stream->out_fd);
1801 if (ret) {
1802 PERROR("close");
1803 }
1804 }
1805
1806 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
1807 ret = close(stream->wait_fd);
1808 if (ret) {
1809 PERROR("close");
1810 }
1811 }
1812
1813 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
1814 ret = close(stream->shm_fd);
1815 if (ret) {
1816 PERROR("close");
1817 }
1818 }
1819
1820 /* Check and cleanup relayd */
1821 rcu_read_lock();
1822 relayd = consumer_find_relayd(stream->net_seq_idx);
1823 if (relayd != NULL) {
1824 uatomic_dec(&relayd->refcount);
1825 assert(uatomic_read(&relayd->refcount) >= 0);
1826
1827 /* Closing streams requires to lock the control socket. */
1828 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1829 ret = relayd_send_close_stream(&relayd->control_sock,
1830 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1831 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1832 if (ret < 0) {
1833 DBG("Unable to close stream on the relayd. Continuing");
1834 /*
1835 * Continue here. There is nothing we can do for the relayd.
1836 * Chances are that the relayd has closed the socket so we just
1837 * continue cleaning up.
1838 */
1839 }
1840
1841 /* Both conditions are met, we destroy the relayd. */
1842 if (uatomic_read(&relayd->refcount) == 0 &&
1843 uatomic_read(&relayd->destroy_flag)) {
1844 destroy_relayd(relayd);
1845 }
1846 }
1847 rcu_read_unlock();
1848
1849 /* Atomically decrement channel refcount since other threads can use it. */
1850 uatomic_dec(&stream->chan->refcount);
1851 if (!uatomic_read(&stream->chan->refcount)
1852 && !uatomic_read(&stream->chan->nb_init_streams)) {
1853 /* Go for channel deletion! */
1854 free_chan = stream->chan;
1855 }
1856
1857 end:
1858 pthread_mutex_unlock(&consumer_data.lock);
1859 pthread_mutex_unlock(&stream->lock);
1860
1861 if (free_chan) {
1862 consumer_del_channel(free_chan);
1863 }
1864
1865 free_stream:
1866 call_rcu(&stream->node.head, consumer_free_stream);
1867 }
1868
1869 /*
1870 * Action done with the metadata stream when adding it to the consumer internal
1871 * data structures to handle it.
1872 */
1873 static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
1874 struct lttng_ht *ht)
1875 {
1876 int ret = 0;
1877 struct consumer_relayd_sock_pair *relayd;
1878
1879 assert(stream);
1880 assert(ht);
1881
1882 DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
1883
1884 pthread_mutex_lock(&consumer_data.lock);
1885
1886 /*
1887 * From here, refcounts are updated so be _careful_ when returning an error
1888 * after this point.
1889 */
1890
1891 rcu_read_lock();
1892 /* Find relayd and, if one is found, increment refcount. */
1893 relayd = consumer_find_relayd(stream->net_seq_idx);
1894 if (relayd != NULL) {
1895 uatomic_inc(&relayd->refcount);
1896 }
1897
1898 /* Update channel refcount once added without error(s). */
1899 uatomic_inc(&stream->chan->refcount);
1900
1901 /*
1902 * When nb_init_streams reaches 0, we don't need to trigger any action in
1903 * terms of destroying the associated channel, because the action that
1904 * causes the count to become 0 also causes a stream to be added. The
1905 * channel deletion will thus be triggered by the following removal of this
1906 * stream.
1907 */
1908 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
1909 uatomic_dec(&stream->chan->nb_init_streams);
1910 }
1911
1912 /* Steal stream identifier to avoid having streams with the same key */
1913 consumer_steal_stream_key(stream->key, ht);
1914
1915 lttng_ht_add_unique_ulong(ht, &stream->node);
1916
1917 /*
1918 * Add stream to the stream_list_ht of the consumer data. No need to steal
1919 * the key since the HT does not use it and we allow to add redundant keys
1920 * into this table.
1921 */
1922 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
1923
1924 rcu_read_unlock();
1925
1926 pthread_mutex_unlock(&consumer_data.lock);
1927 return ret;
1928 }
1929
1930 /*
1931 * Delete data stream that are flagged for deletion (endpoint_status).
1932 */
1933 static void validate_endpoint_status_data_stream(void)
1934 {
1935 struct lttng_ht_iter iter;
1936 struct lttng_consumer_stream *stream;
1937
1938 DBG("Consumer delete flagged data stream");
1939
1940 rcu_read_lock();
1941 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
1942 /* Validate delete flag of the stream */
1943 if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
1944 continue;
1945 }
1946 /* Delete it right now */
1947 consumer_del_stream(stream, data_ht);
1948 }
1949 rcu_read_unlock();
1950 }
1951
1952 /*
1953 * Delete metadata stream that are flagged for deletion (endpoint_status).
1954 */
1955 static void validate_endpoint_status_metadata_stream(
1956 struct lttng_poll_event *pollset)
1957 {
1958 struct lttng_ht_iter iter;
1959 struct lttng_consumer_stream *stream;
1960
1961 DBG("Consumer delete flagged metadata stream");
1962
1963 assert(pollset);
1964
1965 rcu_read_lock();
1966 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
1967 /* Validate delete flag of the stream */
1968 if (!stream->endpoint_status) {
1969 continue;
1970 }
1971 /*
1972 * Remove from pollset so the metadata thread can continue without
1973 * blocking on a deleted stream.
1974 */
1975 lttng_poll_del(pollset, stream->wait_fd);
1976
1977 /* Delete it right now */
1978 consumer_del_metadata_stream(stream, metadata_ht);
1979 }
1980 rcu_read_unlock();
1981 }
1982
1983 /*
1984 * Thread polls on metadata file descriptor and write them on disk or on the
1985 * network.
1986 */
1987 void *consumer_thread_metadata_poll(void *data)
1988 {
1989 int ret, i, pollfd;
1990 uint32_t revents, nb_fd;
1991 struct lttng_consumer_stream *stream = NULL;
1992 struct lttng_ht_iter iter;
1993 struct lttng_ht_node_ulong *node;
1994 struct lttng_poll_event events;
1995 struct lttng_consumer_local_data *ctx = data;
1996 ssize_t len;
1997
1998 rcu_register_thread();
1999
2000 DBG("Thread metadata poll started");
2001
2002 /* Size is set to 1 for the consumer_metadata pipe */
2003 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2004 if (ret < 0) {
2005 ERR("Poll set creation failed");
2006 goto end;
2007 }
2008
2009 ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
2010 if (ret < 0) {
2011 goto end;
2012 }
2013
2014 /* Main loop */
2015 DBG("Metadata main loop started");
2016
2017 while (1) {
2018 lttng_poll_reset(&events);
2019
2020 nb_fd = LTTNG_POLL_GETNB(&events);
2021
2022 /* Only the metadata pipe is set */
2023 if (nb_fd == 0 && consumer_quit == 1) {
2024 goto end;
2025 }
2026
2027 restart:
2028 DBG("Metadata poll wait with %d fd(s)", nb_fd);
2029 ret = lttng_poll_wait(&events, -1);
2030 DBG("Metadata event catched in thread");
2031 if (ret < 0) {
2032 if (errno == EINTR) {
2033 ERR("Poll EINTR catched");
2034 goto restart;
2035 }
2036 goto error;
2037 }
2038
2039 /* From here, the event is a metadata wait fd */
2040 for (i = 0; i < nb_fd; i++) {
2041 revents = LTTNG_POLL_GETEV(&events, i);
2042 pollfd = LTTNG_POLL_GETFD(&events, i);
2043
2044 /* Just don't waste time if no returned events for the fd */
2045 if (!revents) {
2046 continue;
2047 }
2048
2049 if (pollfd == ctx->consumer_metadata_pipe[0]) {
2050 if (revents & (LPOLLERR | LPOLLHUP )) {
2051 DBG("Metadata thread pipe hung up");
2052 /*
2053 * Remove the pipe from the poll set and continue the loop
2054 * since their might be data to consume.
2055 */
2056 lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
2057 close(ctx->consumer_metadata_pipe[0]);
2058 continue;
2059 } else if (revents & LPOLLIN) {
2060 do {
2061 /* Get the stream pointer received */
2062 ret = read(pollfd, &stream, sizeof(stream));
2063 } while (ret < 0 && errno == EINTR);
2064 if (ret < 0 ||
2065 ret < sizeof(struct lttng_consumer_stream *)) {
2066 PERROR("read metadata stream");
2067 /*
2068 * Let's continue here and hope we can still work
2069 * without stopping the consumer. XXX: Should we?
2070 */
2071 continue;
2072 }
2073
2074 /* A NULL stream means that the state has changed. */
2075 if (stream == NULL) {
2076 /* Check for deleted streams. */
2077 validate_endpoint_status_metadata_stream(&events);
2078 continue;
2079 }
2080
2081 DBG("Adding metadata stream %d to poll set",
2082 stream->wait_fd);
2083
2084 ret = consumer_add_metadata_stream(stream, metadata_ht);
2085 if (ret) {
2086 ERR("Unable to add metadata stream");
2087 /* Stream was not setup properly. Continuing. */
2088 consumer_del_metadata_stream(stream, NULL);
2089 continue;
2090 }
2091
2092 /* Add metadata stream to the global poll events list */
2093 lttng_poll_add(&events, stream->wait_fd,
2094 LPOLLIN | LPOLLPRI);
2095 }
2096
2097 /* Handle other stream */
2098 continue;
2099 }
2100
2101 rcu_read_lock();
2102 lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
2103 &iter);
2104 node = lttng_ht_iter_get_node_ulong(&iter);
2105 assert(node);
2106
2107 stream = caa_container_of(node, struct lttng_consumer_stream,
2108 node);
2109
2110 /* Check for error event */
2111 if (revents & (LPOLLERR | LPOLLHUP)) {
2112 DBG("Metadata fd %d is hup|err.", pollfd);
2113 if (!stream->hangup_flush_done
2114 && (consumer_data.type == LTTNG_CONSUMER32_UST
2115 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2116 DBG("Attempting to flush and consume the UST buffers");
2117 lttng_ustconsumer_on_stream_hangup(stream);
2118
2119 /* We just flushed the stream now read it. */
2120 do {
2121 len = ctx->on_buffer_ready(stream, ctx);
2122 /*
2123 * We don't check the return value here since if we get
2124 * a negative len, it means an error occured thus we
2125 * simply remove it from the poll set and free the
2126 * stream.
2127 */
2128 } while (len > 0);
2129 }
2130
2131 lttng_poll_del(&events, stream->wait_fd);
2132 /*
2133 * This call update the channel states, closes file descriptors
2134 * and securely free the stream.
2135 */
2136 consumer_del_metadata_stream(stream, metadata_ht);
2137 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2138 /* Get the data out of the metadata file descriptor */
2139 DBG("Metadata available on fd %d", pollfd);
2140 assert(stream->wait_fd == pollfd);
2141
2142 len = ctx->on_buffer_ready(stream, ctx);
2143 /* It's ok to have an unavailable sub-buffer */
2144 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2145 /* Clean up stream from consumer and free it. */
2146 lttng_poll_del(&events, stream->wait_fd);
2147 consumer_del_metadata_stream(stream, metadata_ht);
2148 } else if (len > 0) {
2149 stream->data_read = 1;
2150 }
2151 }
2152
2153 /* Release RCU lock for the stream looked up */
2154 rcu_read_unlock();
2155 }
2156 }
2157
2158 error:
2159 end:
2160 DBG("Metadata poll thread exiting");
2161 lttng_poll_clean(&events);
2162
2163 if (metadata_ht) {
2164 destroy_stream_ht(metadata_ht);
2165 }
2166
2167 rcu_unregister_thread();
2168 return NULL;
2169 }
2170
2171 /*
2172 * This thread polls the fds in the set to consume the data and write
2173 * it to tracefile if necessary.
2174 */
2175 void *consumer_thread_data_poll(void *data)
2176 {
2177 int num_rdy, num_hup, high_prio, ret, i;
2178 struct pollfd *pollfd = NULL;
2179 /* local view of the streams */
2180 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2181 /* local view of consumer_data.fds_count */
2182 int nb_fd = 0;
2183 struct lttng_consumer_local_data *ctx = data;
2184 ssize_t len;
2185
2186 rcu_register_thread();
2187
2188 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2189 if (data_ht == NULL) {
2190 goto end;
2191 }
2192
2193 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
2194
2195 while (1) {
2196 high_prio = 0;
2197 num_hup = 0;
2198
2199 /*
2200 * the fds set has been updated, we need to update our
2201 * local array as well
2202 */
2203 pthread_mutex_lock(&consumer_data.lock);
2204 if (consumer_data.need_update) {
2205 if (pollfd != NULL) {
2206 free(pollfd);
2207 pollfd = NULL;
2208 }
2209 if (local_stream != NULL) {
2210 free(local_stream);
2211 local_stream = NULL;
2212 }
2213
2214 /* allocate for all fds + 1 for the consumer_data_pipe */
2215 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
2216 if (pollfd == NULL) {
2217 PERROR("pollfd malloc");
2218 pthread_mutex_unlock(&consumer_data.lock);
2219 goto end;
2220 }
2221
2222 /* allocate for all fds + 1 for the consumer_data_pipe */
2223 local_stream = zmalloc((consumer_data.stream_count + 1) *
2224 sizeof(struct lttng_consumer_stream));
2225 if (local_stream == NULL) {
2226 PERROR("local_stream malloc");
2227 pthread_mutex_unlock(&consumer_data.lock);
2228 goto end;
2229 }
2230 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
2231 data_ht);
2232 if (ret < 0) {
2233 ERR("Error in allocating pollfd or local_outfds");
2234 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2235 pthread_mutex_unlock(&consumer_data.lock);
2236 goto end;
2237 }
2238 nb_fd = ret;
2239 consumer_data.need_update = 0;
2240 }
2241 pthread_mutex_unlock(&consumer_data.lock);
2242
2243 /* No FDs and consumer_quit, consumer_cleanup the thread */
2244 if (nb_fd == 0 && consumer_quit == 1) {
2245 goto end;
2246 }
2247 /* poll on the array of fds */
2248 restart:
2249 DBG("polling on %d fd", nb_fd + 1);
2250 num_rdy = poll(pollfd, nb_fd + 1, -1);
2251 DBG("poll num_rdy : %d", num_rdy);
2252 if (num_rdy == -1) {
2253 /*
2254 * Restart interrupted system call.
2255 */
2256 if (errno == EINTR) {
2257 goto restart;
2258 }
2259 PERROR("Poll error");
2260 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2261 goto end;
2262 } else if (num_rdy == 0) {
2263 DBG("Polling thread timed out");
2264 goto end;
2265 }
2266
2267 /*
2268 * If the consumer_data_pipe triggered poll go directly to the
2269 * beginning of the loop to update the array. We want to prioritize
2270 * array update over low-priority reads.
2271 */
2272 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2273 size_t pipe_readlen;
2274
2275 DBG("consumer_data_pipe wake up");
2276 /* Consume 1 byte of pipe data */
2277 do {
2278 pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
2279 sizeof(new_stream));
2280 } while (pipe_readlen == -1 && errno == EINTR);
2281
2282 /*
2283 * If the stream is NULL, just ignore it. It's also possible that
2284 * the sessiond poll thread changed the consumer_quit state and is
2285 * waking us up to test it.
2286 */
2287 if (new_stream == NULL) {
2288 validate_endpoint_status_data_stream();
2289 continue;
2290 }
2291
2292 ret = consumer_add_stream(new_stream, data_ht);
2293 if (ret) {
2294 ERR("Consumer add stream %d failed. Continuing",
2295 new_stream->key);
2296 /*
2297 * At this point, if the add_stream fails, it is not in the
2298 * hash table thus passing the NULL value here.
2299 */
2300 consumer_del_stream(new_stream, NULL);
2301 }
2302
2303 /* Continue to update the local streams and handle prio ones */
2304 continue;
2305 }
2306
2307 /* Take care of high priority channels first. */
2308 for (i = 0; i < nb_fd; i++) {
2309 if (local_stream[i] == NULL) {
2310 continue;
2311 }
2312 if (pollfd[i].revents & POLLPRI) {
2313 DBG("Urgent read on fd %d", pollfd[i].fd);
2314 high_prio = 1;
2315 len = ctx->on_buffer_ready(local_stream[i], ctx);
2316 /* it's ok to have an unavailable sub-buffer */
2317 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2318 /* Clean the stream and free it. */
2319 consumer_del_stream(local_stream[i], data_ht);
2320 local_stream[i] = NULL;
2321 } else if (len > 0) {
2322 local_stream[i]->data_read = 1;
2323 }
2324 }
2325 }
2326
2327 /*
2328 * If we read high prio channel in this loop, try again
2329 * for more high prio data.
2330 */
2331 if (high_prio) {
2332 continue;
2333 }
2334
2335 /* Take care of low priority channels. */
2336 for (i = 0; i < nb_fd; i++) {
2337 if (local_stream[i] == NULL) {
2338 continue;
2339 }
2340 if ((pollfd[i].revents & POLLIN) ||
2341 local_stream[i]->hangup_flush_done) {
2342 DBG("Normal read on fd %d", pollfd[i].fd);
2343 len = ctx->on_buffer_ready(local_stream[i], ctx);
2344 /* it's ok to have an unavailable sub-buffer */
2345 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2346 /* Clean the stream and free it. */
2347 consumer_del_stream(local_stream[i], data_ht);
2348 local_stream[i] = NULL;
2349 } else if (len > 0) {
2350 local_stream[i]->data_read = 1;
2351 }
2352 }
2353 }
2354
2355 /* Handle hangup and errors */
2356 for (i = 0; i < nb_fd; i++) {
2357 if (local_stream[i] == NULL) {
2358 continue;
2359 }
2360 if (!local_stream[i]->hangup_flush_done
2361 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2362 && (consumer_data.type == LTTNG_CONSUMER32_UST
2363 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2364 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2365 pollfd[i].fd);
2366 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2367 /* Attempt read again, for the data we just flushed. */
2368 local_stream[i]->data_read = 1;
2369 }
2370 /*
2371 * If the poll flag is HUP/ERR/NVAL and we have
2372 * read no data in this pass, we can remove the
2373 * stream from its hash table.
2374 */
2375 if ((pollfd[i].revents & POLLHUP)) {
2376 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2377 if (!local_stream[i]->data_read) {
2378 consumer_del_stream(local_stream[i], data_ht);
2379 local_stream[i] = NULL;
2380 num_hup++;
2381 }
2382 } else if (pollfd[i].revents & POLLERR) {
2383 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2384 if (!local_stream[i]->data_read) {
2385 consumer_del_stream(local_stream[i], data_ht);
2386 local_stream[i] = NULL;
2387 num_hup++;
2388 }
2389 } else if (pollfd[i].revents & POLLNVAL) {
2390 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2391 if (!local_stream[i]->data_read) {
2392 consumer_del_stream(local_stream[i], data_ht);
2393 local_stream[i] = NULL;
2394 num_hup++;
2395 }
2396 }
2397 if (local_stream[i] != NULL) {
2398 local_stream[i]->data_read = 0;
2399 }
2400 }
2401 }
2402 end:
2403 DBG("polling thread exiting");
2404 if (pollfd != NULL) {
2405 free(pollfd);
2406 pollfd = NULL;
2407 }
2408 if (local_stream != NULL) {
2409 free(local_stream);
2410 local_stream = NULL;
2411 }
2412
2413 /*
2414 * Close the write side of the pipe so epoll_wait() in
2415 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2416 * read side of the pipe. If we close them both, epoll_wait strangely does
2417 * not return and could create a endless wait period if the pipe is the
2418 * only tracked fd in the poll set. The thread will take care of closing
2419 * the read side.
2420 */
2421 close(ctx->consumer_metadata_pipe[1]);
2422
2423 if (data_ht) {
2424 destroy_data_stream_ht(data_ht);
2425 }
2426
2427 rcu_unregister_thread();
2428 return NULL;
2429 }
2430
2431 /*
2432 * This thread listens on the consumerd socket and receives the file
2433 * descriptors from the session daemon.
2434 */
2435 void *consumer_thread_sessiond_poll(void *data)
2436 {
2437 int sock, client_socket, ret;
2438 /*
2439 * structure to poll for incoming data on communication socket avoids
2440 * making blocking sockets.
2441 */
2442 struct pollfd consumer_sockpoll[2];
2443 struct lttng_consumer_local_data *ctx = data;
2444
2445 rcu_register_thread();
2446
2447 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2448 unlink(ctx->consumer_command_sock_path);
2449 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2450 if (client_socket < 0) {
2451 ERR("Cannot create command socket");
2452 goto end;
2453 }
2454
2455 ret = lttcomm_listen_unix_sock(client_socket);
2456 if (ret < 0) {
2457 goto end;
2458 }
2459
2460 DBG("Sending ready command to lttng-sessiond");
2461 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
2462 /* return < 0 on error, but == 0 is not fatal */
2463 if (ret < 0) {
2464 ERR("Error sending ready command to lttng-sessiond");
2465 goto end;
2466 }
2467
2468 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
2469 if (ret < 0) {
2470 PERROR("fcntl O_NONBLOCK");
2471 goto end;
2472 }
2473
2474 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2475 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2476 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2477 consumer_sockpoll[1].fd = client_socket;
2478 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2479
2480 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2481 goto end;
2482 }
2483 DBG("Connection on client_socket");
2484
2485 /* Blocking call, waiting for transmission */
2486 sock = lttcomm_accept_unix_sock(client_socket);
2487 if (sock <= 0) {
2488 WARN("On accept");
2489 goto end;
2490 }
2491 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
2492 if (ret < 0) {
2493 PERROR("fcntl O_NONBLOCK");
2494 goto end;
2495 }
2496
2497 /* update the polling structure to poll on the established socket */
2498 consumer_sockpoll[1].fd = sock;
2499 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2500
2501 while (1) {
2502 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2503 goto end;
2504 }
2505 DBG("Incoming command on sock");
2506 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2507 if (ret == -ENOENT) {
2508 DBG("Received STOP command");
2509 goto end;
2510 }
2511 if (ret <= 0) {
2512 /*
2513 * This could simply be a session daemon quitting. Don't output
2514 * ERR() here.
2515 */
2516 DBG("Communication interrupted on command socket");
2517 goto end;
2518 }
2519 if (consumer_quit) {
2520 DBG("consumer_thread_receive_fds received quit from signal");
2521 goto end;
2522 }
2523 DBG("received fds on sock");
2524 }
2525 end:
2526 DBG("consumer_thread_receive_fds exiting");
2527
2528 /*
2529 * when all fds have hung up, the polling thread
2530 * can exit cleanly
2531 */
2532 consumer_quit = 1;
2533
2534 /*
2535 * Notify the data poll thread to poll back again and test the
2536 * consumer_quit state that we just set so to quit gracefully.
2537 */
2538 notify_thread_pipe(ctx->consumer_data_pipe[1]);
2539
2540 rcu_unregister_thread();
2541 return NULL;
2542 }
2543
2544 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
2545 struct lttng_consumer_local_data *ctx)
2546 {
2547 switch (consumer_data.type) {
2548 case LTTNG_CONSUMER_KERNEL:
2549 return lttng_kconsumer_read_subbuffer(stream, ctx);
2550 case LTTNG_CONSUMER32_UST:
2551 case LTTNG_CONSUMER64_UST:
2552 return lttng_ustconsumer_read_subbuffer(stream, ctx);
2553 default:
2554 ERR("Unknown consumer_data type");
2555 assert(0);
2556 return -ENOSYS;
2557 }
2558 }
2559
2560 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2561 {
2562 switch (consumer_data.type) {
2563 case LTTNG_CONSUMER_KERNEL:
2564 return lttng_kconsumer_on_recv_stream(stream);
2565 case LTTNG_CONSUMER32_UST:
2566 case LTTNG_CONSUMER64_UST:
2567 return lttng_ustconsumer_on_recv_stream(stream);
2568 default:
2569 ERR("Unknown consumer_data type");
2570 assert(0);
2571 return -ENOSYS;
2572 }
2573 }
2574
2575 /*
2576 * Allocate and set consumer data hash tables.
2577 */
2578 void lttng_consumer_init(void)
2579 {
2580 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2581 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2582 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2583
2584 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2585 assert(metadata_ht);
2586 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2587 assert(data_ht);
2588 }
2589
2590 /*
2591 * Process the ADD_RELAYD command receive by a consumer.
2592 *
2593 * This will create a relayd socket pair and add it to the relayd hash table.
2594 * The caller MUST acquire a RCU read side lock before calling it.
2595 */
2596 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2597 struct lttng_consumer_local_data *ctx, int sock,
2598 struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
2599 {
2600 int fd, ret = -1;
2601 struct consumer_relayd_sock_pair *relayd;
2602
2603 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2604
2605 /* Get relayd reference if exists. */
2606 relayd = consumer_find_relayd(net_seq_idx);
2607 if (relayd == NULL) {
2608 /* Not found. Allocate one. */
2609 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2610 if (relayd == NULL) {
2611 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
2612 goto error;
2613 }
2614 }
2615
2616 /* Poll on consumer socket. */
2617 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2618 ret = -EINTR;
2619 goto error;
2620 }
2621
2622 /* Get relayd socket from session daemon */
2623 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
2624 if (ret != sizeof(fd)) {
2625 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
2626 ret = -1;
2627 goto error;
2628 }
2629
2630 /* Copy socket information and received FD */
2631 switch (sock_type) {
2632 case LTTNG_STREAM_CONTROL:
2633 /* Copy received lttcomm socket */
2634 lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
2635 ret = lttcomm_create_sock(&relayd->control_sock);
2636 if (ret < 0) {
2637 goto error;
2638 }
2639
2640 /* Close the created socket fd which is useless */
2641 close(relayd->control_sock.fd);
2642
2643 /* Assign new file descriptor */
2644 relayd->control_sock.fd = fd;
2645 break;
2646 case LTTNG_STREAM_DATA:
2647 /* Copy received lttcomm socket */
2648 lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
2649 ret = lttcomm_create_sock(&relayd->data_sock);
2650 if (ret < 0) {
2651 goto error;
2652 }
2653
2654 /* Close the created socket fd which is useless */
2655 close(relayd->data_sock.fd);
2656
2657 /* Assign new file descriptor */
2658 relayd->data_sock.fd = fd;
2659 break;
2660 default:
2661 ERR("Unknown relayd socket type (%d)", sock_type);
2662 goto error;
2663 }
2664
2665 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2666 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
2667 relayd->net_seq_idx, fd);
2668
2669 /*
2670 * Add relayd socket pair to consumer data hashtable. If object already
2671 * exists or on error, the function gracefully returns.
2672 */
2673 add_relayd(relayd);
2674
2675 /* All good! */
2676 ret = 0;
2677
2678 error:
2679 return ret;
2680 }
2681
2682 /*
2683 * Try to lock the stream mutex.
2684 *
2685 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
2686 */
2687 static int stream_try_lock(struct lttng_consumer_stream *stream)
2688 {
2689 int ret;
2690
2691 assert(stream);
2692
2693 /*
2694 * Try to lock the stream mutex. On failure, we know that the stream is
2695 * being used else where hence there is data still being extracted.
2696 */
2697 ret = pthread_mutex_trylock(&stream->lock);
2698 if (ret) {
2699 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
2700 ret = 0;
2701 goto end;
2702 }
2703
2704 ret = 1;
2705
2706 end:
2707 return ret;
2708 }
2709
2710 /*
2711 * Check if for a given session id there is still data needed to be extract
2712 * from the buffers.
2713 *
2714 * Return 1 if data is pending or else 0 meaning ready to be read.
2715 */
2716 int consumer_data_pending(uint64_t id)
2717 {
2718 int ret;
2719 struct lttng_ht_iter iter;
2720 struct lttng_ht *ht;
2721 struct lttng_consumer_stream *stream;
2722 struct consumer_relayd_sock_pair *relayd;
2723 int (*data_pending)(struct lttng_consumer_stream *);
2724
2725 DBG("Consumer data pending command on session id %" PRIu64, id);
2726
2727 rcu_read_lock();
2728 pthread_mutex_lock(&consumer_data.lock);
2729
2730 switch (consumer_data.type) {
2731 case LTTNG_CONSUMER_KERNEL:
2732 data_pending = lttng_kconsumer_data_pending;
2733 break;
2734 case LTTNG_CONSUMER32_UST:
2735 case LTTNG_CONSUMER64_UST:
2736 data_pending = lttng_ustconsumer_data_pending;
2737 break;
2738 default:
2739 ERR("Unknown consumer data type");
2740 assert(0);
2741 }
2742
2743 /* Ease our life a bit */
2744 ht = consumer_data.stream_list_ht;
2745
2746 cds_lfht_for_each_entry_duplicate(ht->ht,
2747 ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
2748 ht->match_fct, (void *)((unsigned long) id),
2749 &iter.iter, stream, node_session_id.node) {
2750 /* If this call fails, the stream is being used hence data pending. */
2751 ret = stream_try_lock(stream);
2752 if (!ret) {
2753 goto data_not_pending;
2754 }
2755
2756 /*
2757 * A removed node from the hash table indicates that the stream has
2758 * been deleted thus having a guarantee that the buffers are closed
2759 * on the consumer side. However, data can still be transmitted
2760 * over the network so don't skip the relayd check.
2761 */
2762 ret = cds_lfht_is_node_deleted(&stream->node.node);
2763 if (!ret) {
2764 /* Check the stream if there is data in the buffers. */
2765 ret = data_pending(stream);
2766 if (ret == 1) {
2767 pthread_mutex_unlock(&stream->lock);
2768 goto data_not_pending;
2769 }
2770 }
2771
2772 /* Relayd check */
2773 if (stream->net_seq_idx != -1) {
2774 relayd = consumer_find_relayd(stream->net_seq_idx);
2775 if (!relayd) {
2776 /*
2777 * At this point, if the relayd object is not available for the
2778 * given stream, it is because the relayd is being cleaned up
2779 * so every stream associated with it (for a session id value)
2780 * are or will be marked for deletion hence no data pending.
2781 */
2782 pthread_mutex_unlock(&stream->lock);
2783 goto data_not_pending;
2784 }
2785
2786 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2787 if (stream->metadata_flag) {
2788 ret = relayd_quiescent_control(&relayd->control_sock);
2789 } else {
2790 ret = relayd_data_pending(&relayd->control_sock,
2791 stream->relayd_stream_id, stream->next_net_seq_num);
2792 }
2793 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2794 if (ret == 1) {
2795 pthread_mutex_unlock(&stream->lock);
2796 goto data_not_pending;
2797 }
2798 }
2799 pthread_mutex_unlock(&stream->lock);
2800 }
2801
2802 /*
2803 * Finding _no_ node in the hash table means that the stream(s) have been
2804 * removed thus data is guaranteed to be available for analysis from the
2805 * trace files. This is *only* true for local consumer and not network
2806 * streaming.
2807 */
2808
2809 /* Data is available to be read by a viewer. */
2810 pthread_mutex_unlock(&consumer_data.lock);
2811 rcu_read_unlock();
2812 return 0;
2813
2814 data_not_pending:
2815 /* Data is still being extracted from buffers. */
2816 pthread_mutex_unlock(&consumer_data.lock);
2817 rcu_read_unlock();
2818 return 1;
2819 }
This page took 0.150557 seconds and 3 git commands to generate.