Fix: futex wait: handle spurious futex wakeups
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <poll.h>
12 #include <pthread.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/mman.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <inttypes.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21 #include <stdint.h>
22
23 #include <bin/lttng-consumerd/health-consumerd.hpp>
24 #include <common/common.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/sessiond-comm/sessiond-comm.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/compat/fcntl.hpp>
29 #include <common/compat/endian.hpp>
30 #include <common/pipe.hpp>
31 #include <common/relayd/relayd.hpp>
32 #include <common/utils.hpp>
33 #include <common/consumer/consumer-stream.hpp>
34 #include <common/index/index.hpp>
35 #include <common/consumer/consumer-timer.hpp>
36 #include <common/optional.hpp>
37 #include <common/buffer-view.hpp>
38 #include <common/consumer/consumer.hpp>
39 #include <common/consumer/metadata-bucket.hpp>
40
41 #include "kernel-consumer.hpp"
42
43 extern struct lttng_consumer_global_data the_consumer_data;
44 extern int consumer_poll_timeout;
45
46 /*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
51 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
62 PERROR("Getting sub-buffer snapshot.");
63 }
64
65 return ret;
66 }
67
68 /*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73 int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
87 unsigned long *pos)
88 {
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
94 PERROR("kernctl_snapshot_get_produced");
95 }
96
97 return ret;
98 }
99
100 /*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107 {
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
113 PERROR("kernctl_snapshot_get_consumed");
114 }
115
116 return ret;
117 }
118
119 static
120 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122 {
123 int ret;
124 unsigned long mmap_offset;
125 const char *mmap_base = (const char *) stream->mmap_base;
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134 error:
135 return ret;
136 }
137
138 /*
139 * Take a snapshot of all the stream of a channel
140 * RCU read-side lock must be held across this function to ensure existence of
141 * channel.
142 *
143 * Returns 0 on success, < 0 on error
144 */
145 static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream)
149 {
150 int ret;
151 struct lttng_consumer_stream *stream;
152
153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
154
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
158 rcu_read_lock();
159
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
164 ret = -1;
165 goto end;
166 }
167
168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
169 unsigned long consumed_pos, produced_pos;
170
171 health_code_update();
172
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
178 LTTNG_ASSERT(channel->trace_chunk);
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
188 LTTNG_ASSERT(!stream->trace_chunk);
189 stream->trace_chunk = channel->trace_chunk;
190
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
198 ret = consumer_send_relayd_stream(stream, path);
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto end_unlock;
202 }
203 } else {
204 ret = consumer_stream_create_output_files(stream,
205 false);
206 if (ret < 0) {
207 goto end_unlock;
208 }
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
211 }
212
213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
214 if (ret < 0) {
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto end_unlock;
226 }
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto end_unlock;
246 }
247
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
251
252 while ((long) (consumed_pos - produced_pos) < 0) {
253 ssize_t read_len;
254 unsigned long len, padded_len;
255 const char *subbuf_addr;
256 struct lttng_buffer_view subbuf_view;
257
258 health_code_update();
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
263 if (ret != -EAGAIN) {
264 PERROR("kernctl_get_subbuf snapshot");
265 goto end_unlock;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
269 stream->chan->lost_packets++;
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
276 goto error_put_subbuf;
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
282 goto error_put_subbuf;
283 }
284
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
292 read_len = lttng_consumer_on_read_subbuffer_mmap(
293 stream, &subbuf_view,
294 padded_len - len);
295 /*
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto end_unlock;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 if (relayd_id == (uint64_t) -1ULL) {
321 if (stream->out_fd >= 0) {
322 ret = close(stream->out_fd);
323 if (ret < 0) {
324 PERROR("Kernel consumer snapshot close out_fd");
325 goto end_unlock;
326 }
327 stream->out_fd = -1;
328 }
329 } else {
330 close_relayd_stream(stream);
331 stream->net_seq_idx = (uint64_t) -1ULL;
332 }
333 lttng_trace_chunk_put(stream->trace_chunk);
334 stream->trace_chunk = NULL;
335 pthread_mutex_unlock(&stream->lock);
336 }
337
338 /* All good! */
339 ret = 0;
340 goto end;
341
342 error_put_subbuf:
343 ret = kernctl_put_subbuf(stream->wait_fd);
344 if (ret < 0) {
345 ERR("Snapshot kernctl_put_subbuf error path");
346 }
347 end_unlock:
348 pthread_mutex_unlock(&stream->lock);
349 end:
350 rcu_read_unlock();
351 pthread_mutex_unlock(&channel->lock);
352 return ret;
353 }
354
355 /*
356 * Read the whole metadata available for a snapshot.
357 * RCU read-side lock must be held across this function to ensure existence of
358 * metadata_channel.
359 *
360 * Returns 0 on success, < 0 on error
361 */
362 static int lttng_kconsumer_snapshot_metadata(
363 struct lttng_consumer_channel *metadata_channel,
364 uint64_t key, char *path, uint64_t relayd_id,
365 struct lttng_consumer_local_data *ctx)
366 {
367 int ret, use_relayd = 0;
368 ssize_t ret_read;
369 struct lttng_consumer_stream *metadata_stream;
370
371 LTTNG_ASSERT(ctx);
372
373 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
374 key, path);
375
376 rcu_read_lock();
377
378 metadata_stream = metadata_channel->metadata_stream;
379 LTTNG_ASSERT(metadata_stream);
380
381 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
382 LTTNG_ASSERT(metadata_channel->trace_chunk);
383 LTTNG_ASSERT(metadata_stream->trace_chunk);
384
385 /* Flag once that we have a valid relayd for the stream. */
386 if (relayd_id != (uint64_t) -1ULL) {
387 use_relayd = 1;
388 }
389
390 if (use_relayd) {
391 ret = consumer_send_relayd_stream(metadata_stream, path);
392 if (ret < 0) {
393 goto error_snapshot;
394 }
395 } else {
396 ret = consumer_stream_create_output_files(metadata_stream,
397 false);
398 if (ret < 0) {
399 goto error_snapshot;
400 }
401 }
402
403 do {
404 health_code_update();
405
406 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
407 if (ret_read < 0) {
408 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
409 ret_read);
410 ret = ret_read;
411 goto error_snapshot;
412 }
413 } while (ret_read > 0);
414
415 if (use_relayd) {
416 close_relayd_stream(metadata_stream);
417 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
418 } else {
419 if (metadata_stream->out_fd >= 0) {
420 ret = close(metadata_stream->out_fd);
421 if (ret < 0) {
422 PERROR("Kernel consumer snapshot metadata close out_fd");
423 /*
424 * Don't go on error here since the snapshot was successful at this
425 * point but somehow the close failed.
426 */
427 }
428 metadata_stream->out_fd = -1;
429 lttng_trace_chunk_put(metadata_stream->trace_chunk);
430 metadata_stream->trace_chunk = NULL;
431 }
432 }
433
434 ret = 0;
435 error_snapshot:
436 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
437 consumer_stream_destroy(metadata_stream, NULL);
438 metadata_channel->metadata_stream = NULL;
439 rcu_read_unlock();
440 return ret;
441 }
442
443 /*
444 * Receive command from session daemon and process it.
445 *
446 * Return 1 on success else a negative value or 0.
447 */
448 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
449 int sock, struct pollfd *consumer_sockpoll)
450 {
451 int ret_func;
452 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
453 struct lttcomm_consumer_msg msg;
454
455 health_code_update();
456
457 {
458 ssize_t ret_recv;
459
460 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
461 if (ret_recv != sizeof(msg)) {
462 if (ret_recv > 0) {
463 lttng_consumer_send_error(ctx,
464 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
465 ret_recv = -1;
466 }
467 return ret_recv;
468 }
469 }
470
471 health_code_update();
472
473 /* Deprecated command */
474 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
475
476 health_code_update();
477
478 /* relayd needs RCU read-side protection */
479 rcu_read_lock();
480
481 switch (msg.cmd_type) {
482 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
483 {
484 uint32_t major = msg.u.relayd_sock.major;
485 uint32_t minor = msg.u.relayd_sock.minor;
486 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
487 msg.u.relayd_sock.relayd_socket_protocol;
488
489 /* Session daemon status message are handled in the following call. */
490 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
491 msg.u.relayd_sock.type, ctx, sock,
492 consumer_sockpoll, msg.u.relayd_sock.session_id,
493 msg.u.relayd_sock.relayd_session_id, major,
494 minor, protocol);
495 goto end_nosignal;
496 }
497 case LTTNG_CONSUMER_ADD_CHANNEL:
498 {
499 struct lttng_consumer_channel *new_channel;
500 int ret_send_status, ret_add_channel = 0;
501 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
502
503 health_code_update();
504
505 /* First send a status message before receiving the fds. */
506 ret_send_status = consumer_send_status_msg(sock, ret_code);
507 if (ret_send_status < 0) {
508 /* Somehow, the session daemon is not responding anymore. */
509 goto error_fatal;
510 }
511
512 health_code_update();
513
514 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
515 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
516 msg.u.channel.session_id,
517 msg.u.channel.chunk_id.is_set ?
518 &chunk_id : NULL,
519 msg.u.channel.pathname,
520 msg.u.channel.name,
521 msg.u.channel.relayd_id, msg.u.channel.output,
522 msg.u.channel.tracefile_size,
523 msg.u.channel.tracefile_count, 0,
524 msg.u.channel.monitor,
525 msg.u.channel.live_timer_interval,
526 msg.u.channel.is_live,
527 NULL, NULL);
528 if (new_channel == NULL) {
529 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
530 goto end_nosignal;
531 }
532 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
533 switch (msg.u.channel.output) {
534 case LTTNG_EVENT_SPLICE:
535 new_channel->output = CONSUMER_CHANNEL_SPLICE;
536 break;
537 case LTTNG_EVENT_MMAP:
538 new_channel->output = CONSUMER_CHANNEL_MMAP;
539 break;
540 default:
541 ERR("Channel output unknown %d", msg.u.channel.output);
542 goto end_nosignal;
543 }
544
545 /* Translate and save channel type. */
546 switch (msg.u.channel.type) {
547 case CONSUMER_CHANNEL_TYPE_DATA:
548 case CONSUMER_CHANNEL_TYPE_METADATA:
549 new_channel->type = (consumer_channel_type) msg.u.channel.type;
550 break;
551 default:
552 abort();
553 goto end_nosignal;
554 };
555
556 health_code_update();
557
558 if (ctx->on_recv_channel != NULL) {
559 int ret_recv_channel =
560 ctx->on_recv_channel(new_channel);
561 if (ret_recv_channel == 0) {
562 ret_add_channel = consumer_add_channel(
563 new_channel, ctx);
564 } else if (ret_recv_channel < 0) {
565 goto end_nosignal;
566 }
567 } else {
568 ret_add_channel =
569 consumer_add_channel(new_channel, ctx);
570 }
571 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
572 !ret_add_channel) {
573 int monitor_start_ret;
574
575 DBG("Consumer starting monitor timer");
576 consumer_timer_live_start(new_channel,
577 msg.u.channel.live_timer_interval);
578 monitor_start_ret = consumer_timer_monitor_start(
579 new_channel,
580 msg.u.channel.monitor_timer_interval);
581 if (monitor_start_ret < 0) {
582 ERR("Starting channel monitoring timer failed");
583 goto end_nosignal;
584 }
585 }
586
587 health_code_update();
588
589 /* If we received an error in add_channel, we need to report it. */
590 if (ret_add_channel < 0) {
591 ret_send_status = consumer_send_status_msg(
592 sock, ret_add_channel);
593 if (ret_send_status < 0) {
594 goto error_fatal;
595 }
596 goto end_nosignal;
597 }
598
599 goto end_nosignal;
600 }
601 case LTTNG_CONSUMER_ADD_STREAM:
602 {
603 int fd;
604 struct lttng_pipe *stream_pipe;
605 struct lttng_consumer_stream *new_stream;
606 struct lttng_consumer_channel *channel;
607 int alloc_ret = 0;
608 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
609 ssize_t ret_pipe_write, ret_recv;
610
611 /*
612 * Get stream's channel reference. Needed when adding the stream to the
613 * global hash table.
614 */
615 channel = consumer_find_channel(msg.u.stream.channel_key);
616 if (!channel) {
617 /*
618 * We could not find the channel. Can happen if cpu hotplug
619 * happens while tearing down.
620 */
621 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
622 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
623 }
624
625 health_code_update();
626
627 /* First send a status message before receiving the fds. */
628 ret_send_status = consumer_send_status_msg(sock, ret_code);
629 if (ret_send_status < 0) {
630 /* Somehow, the session daemon is not responding anymore. */
631 goto error_add_stream_fatal;
632 }
633
634 health_code_update();
635
636 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
637 /* Channel was not found. */
638 goto error_add_stream_nosignal;
639 }
640
641 /* Blocking call */
642 health_poll_entry();
643 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
644 health_poll_exit();
645 if (ret_poll) {
646 goto error_add_stream_fatal;
647 }
648
649 health_code_update();
650
651 /* Get stream file descriptor from socket */
652 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
653 if (ret_recv != sizeof(fd)) {
654 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
655 ret_func = ret_recv;
656 goto end;
657 }
658
659 health_code_update();
660
661 /*
662 * Send status code to session daemon only if the recv works. If the
663 * above recv() failed, the session daemon is notified through the
664 * error socket and the teardown is eventually done.
665 */
666 ret_send_status = consumer_send_status_msg(sock, ret_code);
667 if (ret_send_status < 0) {
668 /* Somehow, the session daemon is not responding anymore. */
669 goto error_add_stream_nosignal;
670 }
671
672 health_code_update();
673
674 pthread_mutex_lock(&channel->lock);
675 new_stream = consumer_stream_create(
676 channel,
677 channel->key,
678 fd,
679 channel->name,
680 channel->relayd_id,
681 channel->session_id,
682 channel->trace_chunk,
683 msg.u.stream.cpu,
684 &alloc_ret,
685 channel->type,
686 channel->monitor);
687 if (new_stream == NULL) {
688 switch (alloc_ret) {
689 case -ENOMEM:
690 case -EINVAL:
691 default:
692 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
693 break;
694 }
695 pthread_mutex_unlock(&channel->lock);
696 goto error_add_stream_nosignal;
697 }
698
699 new_stream->wait_fd = fd;
700 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
701 new_stream->wait_fd, &new_stream->max_sb_size);
702 if (ret_get_max_subbuf_size < 0) {
703 pthread_mutex_unlock(&channel->lock);
704 ERR("Failed to get kernel maximal subbuffer size");
705 goto error_add_stream_nosignal;
706 }
707
708 consumer_stream_update_channel_attributes(new_stream,
709 channel);
710
711 /*
712 * We've just assigned the channel to the stream so increment the
713 * refcount right now. We don't need to increment the refcount for
714 * streams in no monitor because we handle manually the cleanup of
715 * those. It is very important to make sure there is NO prior
716 * consumer_del_stream() calls or else the refcount will be unbalanced.
717 */
718 if (channel->monitor) {
719 uatomic_inc(&new_stream->chan->refcount);
720 }
721
722 /*
723 * The buffer flush is done on the session daemon side for the kernel
724 * so no need for the stream "hangup_flush_done" variable to be
725 * tracked. This is important for a kernel stream since we don't rely
726 * on the flush state of the stream to read data. It's not the case for
727 * user space tracing.
728 */
729 new_stream->hangup_flush_done = 0;
730
731 health_code_update();
732
733 pthread_mutex_lock(&new_stream->lock);
734 if (ctx->on_recv_stream) {
735 int ret_recv_stream = ctx->on_recv_stream(new_stream);
736 if (ret_recv_stream < 0) {
737 pthread_mutex_unlock(&new_stream->lock);
738 pthread_mutex_unlock(&channel->lock);
739 consumer_stream_free(new_stream);
740 goto error_add_stream_nosignal;
741 }
742 }
743 health_code_update();
744
745 if (new_stream->metadata_flag) {
746 channel->metadata_stream = new_stream;
747 }
748
749 /* Do not monitor this stream. */
750 if (!channel->monitor) {
751 DBG("Kernel consumer add stream %s in no monitor mode with "
752 "relayd id %" PRIu64, new_stream->name,
753 new_stream->net_seq_idx);
754 cds_list_add(&new_stream->send_node, &channel->streams.head);
755 pthread_mutex_unlock(&new_stream->lock);
756 pthread_mutex_unlock(&channel->lock);
757 goto end_add_stream;
758 }
759
760 /* Send stream to relayd if the stream has an ID. */
761 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
762 int ret_send_relayd_stream;
763
764 ret_send_relayd_stream = consumer_send_relayd_stream(
765 new_stream, new_stream->chan->pathname);
766 if (ret_send_relayd_stream < 0) {
767 pthread_mutex_unlock(&new_stream->lock);
768 pthread_mutex_unlock(&channel->lock);
769 consumer_stream_free(new_stream);
770 goto error_add_stream_nosignal;
771 }
772
773 /*
774 * If adding an extra stream to an already
775 * existing channel (e.g. cpu hotplug), we need
776 * to send the "streams_sent" command to relayd.
777 */
778 if (channel->streams_sent_to_relayd) {
779 int ret_send_relayd_streams_sent;
780
781 ret_send_relayd_streams_sent =
782 consumer_send_relayd_streams_sent(
783 new_stream->net_seq_idx);
784 if (ret_send_relayd_streams_sent < 0) {
785 pthread_mutex_unlock(&new_stream->lock);
786 pthread_mutex_unlock(&channel->lock);
787 goto error_add_stream_nosignal;
788 }
789 }
790 }
791 pthread_mutex_unlock(&new_stream->lock);
792 pthread_mutex_unlock(&channel->lock);
793
794 /* Get the right pipe where the stream will be sent. */
795 if (new_stream->metadata_flag) {
796 consumer_add_metadata_stream(new_stream);
797 stream_pipe = ctx->consumer_metadata_pipe;
798 } else {
799 consumer_add_data_stream(new_stream);
800 stream_pipe = ctx->consumer_data_pipe;
801 }
802
803 /* Visible to other threads */
804 new_stream->globally_visible = 1;
805
806 health_code_update();
807
808 ret_pipe_write = lttng_pipe_write(
809 stream_pipe, &new_stream, sizeof(new_stream));
810 if (ret_pipe_write < 0) {
811 ERR("Consumer write %s stream to pipe %d",
812 new_stream->metadata_flag ? "metadata" : "data",
813 lttng_pipe_get_writefd(stream_pipe));
814 if (new_stream->metadata_flag) {
815 consumer_del_stream_for_metadata(new_stream);
816 } else {
817 consumer_del_stream_for_data(new_stream);
818 }
819 goto error_add_stream_nosignal;
820 }
821
822 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
823 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
824 end_add_stream:
825 break;
826 error_add_stream_nosignal:
827 goto end_nosignal;
828 error_add_stream_fatal:
829 goto error_fatal;
830 }
831 case LTTNG_CONSUMER_STREAMS_SENT:
832 {
833 struct lttng_consumer_channel *channel;
834 int ret_send_status;
835
836 /*
837 * Get stream's channel reference. Needed when adding the stream to the
838 * global hash table.
839 */
840 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
841 if (!channel) {
842 /*
843 * We could not find the channel. Can happen if cpu hotplug
844 * happens while tearing down.
845 */
846 ERR("Unable to find channel key %" PRIu64,
847 msg.u.sent_streams.channel_key);
848 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
849 }
850
851 health_code_update();
852
853 /*
854 * Send status code to session daemon.
855 */
856 ret_send_status = consumer_send_status_msg(sock, ret_code);
857 if (ret_send_status < 0 ||
858 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
859 /* Somehow, the session daemon is not responding anymore. */
860 goto error_streams_sent_nosignal;
861 }
862
863 health_code_update();
864
865 /*
866 * We should not send this message if we don't monitor the
867 * streams in this channel.
868 */
869 if (!channel->monitor) {
870 goto end_error_streams_sent;
871 }
872
873 health_code_update();
874 /* Send stream to relayd if the stream has an ID. */
875 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
876 int ret_send_relay_streams;
877
878 ret_send_relay_streams = consumer_send_relayd_streams_sent(
879 msg.u.sent_streams.net_seq_idx);
880 if (ret_send_relay_streams < 0) {
881 goto error_streams_sent_nosignal;
882 }
883 channel->streams_sent_to_relayd = true;
884 }
885 end_error_streams_sent:
886 break;
887 error_streams_sent_nosignal:
888 goto end_nosignal;
889 }
890 case LTTNG_CONSUMER_UPDATE_STREAM:
891 {
892 rcu_read_unlock();
893 return -ENOSYS;
894 }
895 case LTTNG_CONSUMER_DESTROY_RELAYD:
896 {
897 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
898 struct consumer_relayd_sock_pair *relayd;
899 int ret_send_status;
900
901 DBG("Kernel consumer destroying relayd %" PRIu64, index);
902
903 /* Get relayd reference if exists. */
904 relayd = consumer_find_relayd(index);
905 if (relayd == NULL) {
906 DBG("Unable to find relayd %" PRIu64, index);
907 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
908 }
909
910 /*
911 * Each relayd socket pair has a refcount of stream attached to it
912 * which tells if the relayd is still active or not depending on the
913 * refcount value.
914 *
915 * This will set the destroy flag of the relayd object and destroy it
916 * if the refcount reaches zero when called.
917 *
918 * The destroy can happen either here or when a stream fd hangs up.
919 */
920 if (relayd) {
921 consumer_flag_relayd_for_destroy(relayd);
922 }
923
924 health_code_update();
925
926 ret_send_status = consumer_send_status_msg(sock, ret_code);
927 if (ret_send_status < 0) {
928 /* Somehow, the session daemon is not responding anymore. */
929 goto error_fatal;
930 }
931
932 goto end_nosignal;
933 }
934 case LTTNG_CONSUMER_DATA_PENDING:
935 {
936 int32_t ret_data_pending;
937 uint64_t id = msg.u.data_pending.session_id;
938 ssize_t ret_send;
939
940 DBG("Kernel consumer data pending command for id %" PRIu64, id);
941
942 ret_data_pending = consumer_data_pending(id);
943
944 health_code_update();
945
946 /* Send back returned value to session daemon */
947 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
948 sizeof(ret_data_pending));
949 if (ret_send < 0) {
950 PERROR("send data pending ret code");
951 goto error_fatal;
952 }
953
954 /*
955 * No need to send back a status message since the data pending
956 * returned value is the response.
957 */
958 break;
959 }
960 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
961 {
962 struct lttng_consumer_channel *channel;
963 uint64_t key = msg.u.snapshot_channel.key;
964 int ret_send_status;
965
966 channel = consumer_find_channel(key);
967 if (!channel) {
968 ERR("Channel %" PRIu64 " not found", key);
969 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
970 } else {
971 if (msg.u.snapshot_channel.metadata == 1) {
972 int ret_snapshot;
973
974 ret_snapshot = lttng_kconsumer_snapshot_metadata(
975 channel, key,
976 msg.u.snapshot_channel.pathname,
977 msg.u.snapshot_channel.relayd_id,
978 ctx);
979 if (ret_snapshot < 0) {
980 ERR("Snapshot metadata failed");
981 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
982 }
983 } else {
984 int ret_snapshot;
985
986 ret_snapshot = lttng_kconsumer_snapshot_channel(
987 channel, key,
988 msg.u.snapshot_channel.pathname,
989 msg.u.snapshot_channel.relayd_id,
990 msg.u.snapshot_channel
991 .nb_packets_per_stream);
992 if (ret_snapshot < 0) {
993 ERR("Snapshot channel failed");
994 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
995 }
996 }
997 }
998 health_code_update();
999
1000 ret_send_status = consumer_send_status_msg(sock, ret_code);
1001 if (ret_send_status < 0) {
1002 /* Somehow, the session daemon is not responding anymore. */
1003 goto end_nosignal;
1004 }
1005 break;
1006 }
1007 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1008 {
1009 uint64_t key = msg.u.destroy_channel.key;
1010 struct lttng_consumer_channel *channel;
1011 int ret_send_status;
1012
1013 channel = consumer_find_channel(key);
1014 if (!channel) {
1015 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1016 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1017 }
1018
1019 health_code_update();
1020
1021 ret_send_status = consumer_send_status_msg(sock, ret_code);
1022 if (ret_send_status < 0) {
1023 /* Somehow, the session daemon is not responding anymore. */
1024 goto end_destroy_channel;
1025 }
1026
1027 health_code_update();
1028
1029 /* Stop right now if no channel was found. */
1030 if (!channel) {
1031 goto end_destroy_channel;
1032 }
1033
1034 /*
1035 * This command should ONLY be issued for channel with streams set in
1036 * no monitor mode.
1037 */
1038 LTTNG_ASSERT(!channel->monitor);
1039
1040 /*
1041 * The refcount should ALWAYS be 0 in the case of a channel in no
1042 * monitor mode.
1043 */
1044 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1045
1046 consumer_del_channel(channel);
1047 end_destroy_channel:
1048 goto end_nosignal;
1049 }
1050 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1051 {
1052 ssize_t ret;
1053 uint64_t count;
1054 struct lttng_consumer_channel *channel;
1055 uint64_t id = msg.u.discarded_events.session_id;
1056 uint64_t key = msg.u.discarded_events.channel_key;
1057
1058 DBG("Kernel consumer discarded events command for session id %"
1059 PRIu64 ", channel key %" PRIu64, id, key);
1060
1061 channel = consumer_find_channel(key);
1062 if (!channel) {
1063 ERR("Kernel consumer discarded events channel %"
1064 PRIu64 " not found", key);
1065 count = 0;
1066 } else {
1067 count = channel->discarded_events;
1068 }
1069
1070 health_code_update();
1071
1072 /* Send back returned value to session daemon */
1073 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1074 if (ret < 0) {
1075 PERROR("send discarded events");
1076 goto error_fatal;
1077 }
1078
1079 break;
1080 }
1081 case LTTNG_CONSUMER_LOST_PACKETS:
1082 {
1083 ssize_t ret;
1084 uint64_t count;
1085 struct lttng_consumer_channel *channel;
1086 uint64_t id = msg.u.lost_packets.session_id;
1087 uint64_t key = msg.u.lost_packets.channel_key;
1088
1089 DBG("Kernel consumer lost packets command for session id %"
1090 PRIu64 ", channel key %" PRIu64, id, key);
1091
1092 channel = consumer_find_channel(key);
1093 if (!channel) {
1094 ERR("Kernel consumer lost packets channel %"
1095 PRIu64 " not found", key);
1096 count = 0;
1097 } else {
1098 count = channel->lost_packets;
1099 }
1100
1101 health_code_update();
1102
1103 /* Send back returned value to session daemon */
1104 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1105 if (ret < 0) {
1106 PERROR("send lost packets");
1107 goto error_fatal;
1108 }
1109
1110 break;
1111 }
1112 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1113 {
1114 int channel_monitor_pipe;
1115 int ret_send_status, ret_set_channel_monitor_pipe;
1116 ssize_t ret_recv;
1117
1118 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1119 /* Successfully received the command's type. */
1120 ret_send_status = consumer_send_status_msg(sock, ret_code);
1121 if (ret_send_status < 0) {
1122 goto error_fatal;
1123 }
1124
1125 ret_recv = lttcomm_recv_fds_unix_sock(
1126 sock, &channel_monitor_pipe, 1);
1127 if (ret_recv != sizeof(channel_monitor_pipe)) {
1128 ERR("Failed to receive channel monitor pipe");
1129 goto error_fatal;
1130 }
1131
1132 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1133 ret_set_channel_monitor_pipe =
1134 consumer_timer_thread_set_channel_monitor_pipe(
1135 channel_monitor_pipe);
1136 if (!ret_set_channel_monitor_pipe) {
1137 int flags;
1138 int ret_fcntl;
1139
1140 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1141 /* Set the pipe as non-blocking. */
1142 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1143 if (ret_fcntl == -1) {
1144 PERROR("fcntl get flags of the channel monitoring pipe");
1145 goto error_fatal;
1146 }
1147 flags = ret_fcntl;
1148
1149 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
1150 flags | O_NONBLOCK);
1151 if (ret_fcntl == -1) {
1152 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1153 goto error_fatal;
1154 }
1155 DBG("Channel monitor pipe set as non-blocking");
1156 } else {
1157 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1158 }
1159 ret_send_status = consumer_send_status_msg(sock, ret_code);
1160 if (ret_send_status < 0) {
1161 goto error_fatal;
1162 }
1163 break;
1164 }
1165 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1166 {
1167 struct lttng_consumer_channel *channel;
1168 uint64_t key = msg.u.rotate_channel.key;
1169 int ret_send_status;
1170
1171 DBG("Consumer rotate channel %" PRIu64, key);
1172
1173 channel = consumer_find_channel(key);
1174 if (!channel) {
1175 ERR("Channel %" PRIu64 " not found", key);
1176 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1177 } else {
1178 /*
1179 * Sample the rotate position of all the streams in this channel.
1180 */
1181 int ret_rotate_channel;
1182
1183 ret_rotate_channel = lttng_consumer_rotate_channel(
1184 channel, key,
1185 msg.u.rotate_channel.relayd_id);
1186 if (ret_rotate_channel < 0) {
1187 ERR("Rotate channel failed");
1188 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1189 }
1190
1191 health_code_update();
1192 }
1193
1194 ret_send_status = consumer_send_status_msg(sock, ret_code);
1195 if (ret_send_status < 0) {
1196 /* Somehow, the session daemon is not responding anymore. */
1197 goto error_rotate_channel;
1198 }
1199 if (channel) {
1200 /* Rotate the streams that are ready right now. */
1201 int ret_rotate;
1202
1203 ret_rotate = lttng_consumer_rotate_ready_streams(
1204 channel, key);
1205 if (ret_rotate < 0) {
1206 ERR("Rotate ready streams failed");
1207 }
1208 }
1209 break;
1210 error_rotate_channel:
1211 goto end_nosignal;
1212 }
1213 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1214 {
1215 struct lttng_consumer_channel *channel;
1216 uint64_t key = msg.u.clear_channel.key;
1217 int ret_send_status;
1218
1219 channel = consumer_find_channel(key);
1220 if (!channel) {
1221 DBG("Channel %" PRIu64 " not found", key);
1222 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1223 } else {
1224 int ret_clear_channel;
1225
1226 ret_clear_channel =
1227 lttng_consumer_clear_channel(channel);
1228 if (ret_clear_channel) {
1229 ERR("Clear channel failed");
1230 ret_code = (lttcomm_return_code) ret_clear_channel;
1231 }
1232
1233 health_code_update();
1234 }
1235
1236 ret_send_status = consumer_send_status_msg(sock, ret_code);
1237 if (ret_send_status < 0) {
1238 /* Somehow, the session daemon is not responding anymore. */
1239 goto end_nosignal;
1240 }
1241
1242 break;
1243 }
1244 case LTTNG_CONSUMER_INIT:
1245 {
1246 int ret_send_status;
1247 lttng_uuid sessiond_uuid;
1248
1249 std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
1250 sessiond_uuid.begin());
1251
1252 ret_code = lttng_consumer_init_command(ctx,
1253 sessiond_uuid);
1254 health_code_update();
1255 ret_send_status = consumer_send_status_msg(sock, ret_code);
1256 if (ret_send_status < 0) {
1257 /* Somehow, the session daemon is not responding anymore. */
1258 goto end_nosignal;
1259 }
1260 break;
1261 }
1262 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1263 {
1264 const struct lttng_credentials credentials = {
1265 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1266 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
1267 };
1268 const bool is_local_trace =
1269 !msg.u.create_trace_chunk.relayd_id.is_set;
1270 const uint64_t relayd_id =
1271 msg.u.create_trace_chunk.relayd_id.value;
1272 const char *chunk_override_name =
1273 *msg.u.create_trace_chunk.override_name ?
1274 msg.u.create_trace_chunk.override_name :
1275 NULL;
1276 struct lttng_directory_handle *chunk_directory_handle = NULL;
1277
1278 /*
1279 * The session daemon will only provide a chunk directory file
1280 * descriptor for local traces.
1281 */
1282 if (is_local_trace) {
1283 int chunk_dirfd;
1284 int ret_send_status;
1285 ssize_t ret_recv;
1286
1287 /* Acnowledge the reception of the command. */
1288 ret_send_status = consumer_send_status_msg(
1289 sock, LTTCOMM_CONSUMERD_SUCCESS);
1290 if (ret_send_status < 0) {
1291 /* Somehow, the session daemon is not responding anymore. */
1292 goto end_nosignal;
1293 }
1294
1295 ret_recv = lttcomm_recv_fds_unix_sock(
1296 sock, &chunk_dirfd, 1);
1297 if (ret_recv != sizeof(chunk_dirfd)) {
1298 ERR("Failed to receive trace chunk directory file descriptor");
1299 goto error_fatal;
1300 }
1301
1302 DBG("Received trace chunk directory fd (%d)",
1303 chunk_dirfd);
1304 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
1305 chunk_dirfd);
1306 if (!chunk_directory_handle) {
1307 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1308 if (close(chunk_dirfd)) {
1309 PERROR("Failed to close chunk directory file descriptor");
1310 }
1311 goto error_fatal;
1312 }
1313 }
1314
1315 ret_code = lttng_consumer_create_trace_chunk(
1316 !is_local_trace ? &relayd_id : NULL,
1317 msg.u.create_trace_chunk.session_id,
1318 msg.u.create_trace_chunk.chunk_id,
1319 (time_t) msg.u.create_trace_chunk
1320 .creation_timestamp,
1321 chunk_override_name,
1322 msg.u.create_trace_chunk.credentials.is_set ?
1323 &credentials :
1324 NULL,
1325 chunk_directory_handle);
1326 lttng_directory_handle_put(chunk_directory_handle);
1327 goto end_msg_sessiond;
1328 }
1329 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1330 {
1331 enum lttng_trace_chunk_command_type close_command =
1332 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1333 const uint64_t relayd_id =
1334 msg.u.close_trace_chunk.relayd_id.value;
1335 struct lttcomm_consumer_close_trace_chunk_reply reply;
1336 char path[LTTNG_PATH_MAX];
1337 ssize_t ret_send;
1338
1339 ret_code = lttng_consumer_close_trace_chunk(
1340 msg.u.close_trace_chunk.relayd_id.is_set ?
1341 &relayd_id :
1342 NULL,
1343 msg.u.close_trace_chunk.session_id,
1344 msg.u.close_trace_chunk.chunk_id,
1345 (time_t) msg.u.close_trace_chunk.close_timestamp,
1346 msg.u.close_trace_chunk.close_command.is_set ?
1347 &close_command :
1348 NULL, path);
1349 reply.ret_code = ret_code;
1350 reply.path_length = strlen(path) + 1;
1351 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1352 if (ret_send != sizeof(reply)) {
1353 goto error_fatal;
1354 }
1355 ret_send = lttcomm_send_unix_sock(
1356 sock, path, reply.path_length);
1357 if (ret_send != reply.path_length) {
1358 goto error_fatal;
1359 }
1360 goto end_nosignal;
1361 }
1362 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1363 {
1364 const uint64_t relayd_id =
1365 msg.u.trace_chunk_exists.relayd_id.value;
1366
1367 ret_code = lttng_consumer_trace_chunk_exists(
1368 msg.u.trace_chunk_exists.relayd_id.is_set ?
1369 &relayd_id : NULL,
1370 msg.u.trace_chunk_exists.session_id,
1371 msg.u.trace_chunk_exists.chunk_id);
1372 goto end_msg_sessiond;
1373 }
1374 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1375 {
1376 const uint64_t key = msg.u.open_channel_packets.key;
1377 struct lttng_consumer_channel *channel =
1378 consumer_find_channel(key);
1379
1380 if (channel) {
1381 pthread_mutex_lock(&channel->lock);
1382 ret_code = lttng_consumer_open_channel_packets(channel);
1383 pthread_mutex_unlock(&channel->lock);
1384 } else {
1385 WARN("Channel %" PRIu64 " not found", key);
1386 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1387 }
1388
1389 health_code_update();
1390 goto end_msg_sessiond;
1391 }
1392 default:
1393 goto end_nosignal;
1394 }
1395
1396 end_nosignal:
1397 /*
1398 * Return 1 to indicate success since the 0 value can be a socket
1399 * shutdown during the recv() or send() call.
1400 */
1401 ret_func = 1;
1402 goto end;
1403 error_fatal:
1404 /* This will issue a consumer stop. */
1405 ret_func = -1;
1406 goto end;
1407 end_msg_sessiond:
1408 /*
1409 * The returned value here is not useful since either way we'll return 1 to
1410 * the caller because the session daemon socket management is done
1411 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1412 */
1413 {
1414 int ret_send_status;
1415
1416 ret_send_status = consumer_send_status_msg(sock, ret_code);
1417 if (ret_send_status < 0) {
1418 goto error_fatal;
1419 }
1420 }
1421
1422 ret_func = 1;
1423
1424 end:
1425 health_code_update();
1426 rcu_read_unlock();
1427 return ret_func;
1428 }
1429
1430 /*
1431 * Sync metadata meaning request them to the session daemon and snapshot to the
1432 * metadata thread can consumer them.
1433 *
1434 * Metadata stream lock MUST be acquired.
1435 */
1436 enum sync_metadata_status lttng_kconsumer_sync_metadata(
1437 struct lttng_consumer_stream *metadata)
1438 {
1439 int ret;
1440 enum sync_metadata_status status;
1441
1442 LTTNG_ASSERT(metadata);
1443
1444 ret = kernctl_buffer_flush(metadata->wait_fd);
1445 if (ret < 0) {
1446 ERR("Failed to flush kernel stream");
1447 status = SYNC_METADATA_STATUS_ERROR;
1448 goto end;
1449 }
1450
1451 ret = kernctl_snapshot(metadata->wait_fd);
1452 if (ret < 0) {
1453 if (errno == EAGAIN) {
1454 /* No new metadata, exit. */
1455 DBG("Sync metadata, no new kernel metadata");
1456 status = SYNC_METADATA_STATUS_NO_DATA;
1457 } else {
1458 ERR("Sync metadata, taking kernel snapshot failed.");
1459 status = SYNC_METADATA_STATUS_ERROR;
1460 }
1461 } else {
1462 status = SYNC_METADATA_STATUS_NEW_DATA;
1463 }
1464
1465 end:
1466 return status;
1467 }
1468
1469 static
1470 int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1471 struct stream_subbuffer *subbuf)
1472 {
1473 int ret;
1474
1475 ret = kernctl_get_subbuf_size(
1476 stream->wait_fd, &subbuf->info.data.subbuf_size);
1477 if (ret) {
1478 goto end;
1479 }
1480
1481 ret = kernctl_get_padded_subbuf_size(
1482 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1483 if (ret) {
1484 goto end;
1485 }
1486
1487 end:
1488 return ret;
1489 }
1490
1491 static
1492 int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1493 struct stream_subbuffer *subbuf)
1494 {
1495 int ret;
1496
1497 ret = extract_common_subbuffer_info(stream, subbuf);
1498 if (ret) {
1499 goto end;
1500 }
1501
1502 ret = kernctl_get_metadata_version(
1503 stream->wait_fd, &subbuf->info.metadata.version);
1504 if (ret) {
1505 goto end;
1506 }
1507
1508 end:
1509 return ret;
1510 }
1511
1512 static
1513 int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1514 struct stream_subbuffer *subbuf)
1515 {
1516 int ret;
1517
1518 ret = extract_common_subbuffer_info(stream, subbuf);
1519 if (ret) {
1520 goto end;
1521 }
1522
1523 ret = kernctl_get_packet_size(
1524 stream->wait_fd, &subbuf->info.data.packet_size);
1525 if (ret < 0) {
1526 PERROR("Failed to get sub-buffer packet size");
1527 goto end;
1528 }
1529
1530 ret = kernctl_get_content_size(
1531 stream->wait_fd, &subbuf->info.data.content_size);
1532 if (ret < 0) {
1533 PERROR("Failed to get sub-buffer content size");
1534 goto end;
1535 }
1536
1537 ret = kernctl_get_timestamp_begin(
1538 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1539 if (ret < 0) {
1540 PERROR("Failed to get sub-buffer begin timestamp");
1541 goto end;
1542 }
1543
1544 ret = kernctl_get_timestamp_end(
1545 stream->wait_fd, &subbuf->info.data.timestamp_end);
1546 if (ret < 0) {
1547 PERROR("Failed to get sub-buffer end timestamp");
1548 goto end;
1549 }
1550
1551 ret = kernctl_get_events_discarded(
1552 stream->wait_fd, &subbuf->info.data.events_discarded);
1553 if (ret) {
1554 PERROR("Failed to get sub-buffer events discarded count");
1555 goto end;
1556 }
1557
1558 ret = kernctl_get_sequence_number(stream->wait_fd,
1559 &subbuf->info.data.sequence_number.value);
1560 if (ret) {
1561 /* May not be supported by older LTTng-modules. */
1562 if (ret != -ENOTTY) {
1563 PERROR("Failed to get sub-buffer sequence number");
1564 goto end;
1565 }
1566 } else {
1567 subbuf->info.data.sequence_number.is_set = true;
1568 }
1569
1570 ret = kernctl_get_stream_id(
1571 stream->wait_fd, &subbuf->info.data.stream_id);
1572 if (ret < 0) {
1573 PERROR("Failed to get stream id");
1574 goto end;
1575 }
1576
1577 ret = kernctl_get_instance_id(stream->wait_fd,
1578 &subbuf->info.data.stream_instance_id.value);
1579 if (ret) {
1580 /* May not be supported by older LTTng-modules. */
1581 if (ret != -ENOTTY) {
1582 PERROR("Failed to get stream instance id");
1583 goto end;
1584 }
1585 } else {
1586 subbuf->info.data.stream_instance_id.is_set = true;
1587 }
1588 end:
1589 return ret;
1590 }
1591
1592 static
1593 enum get_next_subbuffer_status get_subbuffer_common(
1594 struct lttng_consumer_stream *stream,
1595 struct stream_subbuffer *subbuffer)
1596 {
1597 int ret;
1598 enum get_next_subbuffer_status status;
1599
1600 ret = kernctl_get_next_subbuf(stream->wait_fd);
1601 switch (ret) {
1602 case 0:
1603 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1604 break;
1605 case -ENODATA:
1606 case -EAGAIN:
1607 /*
1608 * The caller only expects -ENODATA when there is no data to
1609 * read, but the kernel tracer returns -EAGAIN when there is
1610 * currently no data for a non-finalized stream, and -ENODATA
1611 * when there is no data for a finalized stream. Those can be
1612 * combined into a -ENODATA return value.
1613 */
1614 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1615 goto end;
1616 default:
1617 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1618 goto end;
1619 }
1620
1621 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1622 stream, subbuffer);
1623 if (ret) {
1624 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1625 }
1626 end:
1627 return status;
1628 }
1629
1630 static
1631 enum get_next_subbuffer_status get_next_subbuffer_splice(
1632 struct lttng_consumer_stream *stream,
1633 struct stream_subbuffer *subbuffer)
1634 {
1635 const enum get_next_subbuffer_status status =
1636 get_subbuffer_common(stream, subbuffer);
1637
1638 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1639 goto end;
1640 }
1641
1642 subbuffer->buffer.fd = stream->wait_fd;
1643 end:
1644 return status;
1645 }
1646
1647 static
1648 enum get_next_subbuffer_status get_next_subbuffer_mmap(
1649 struct lttng_consumer_stream *stream,
1650 struct stream_subbuffer *subbuffer)
1651 {
1652 int ret;
1653 enum get_next_subbuffer_status status;
1654 const char *addr;
1655
1656 status = get_subbuffer_common(stream, subbuffer);
1657 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1658 goto end;
1659 }
1660
1661 ret = get_current_subbuf_addr(stream, &addr);
1662 if (ret) {
1663 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1664 goto end;
1665 }
1666
1667 subbuffer->buffer.buffer = lttng_buffer_view_init(
1668 addr, 0, subbuffer->info.data.padded_subbuf_size);
1669 end:
1670 return status;
1671 }
1672
1673 static
1674 enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1675 struct stream_subbuffer *subbuffer)
1676 {
1677 int ret;
1678 const char *addr;
1679 bool coherent;
1680 enum get_next_subbuffer_status status;
1681
1682 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1683 &coherent);
1684 if (ret) {
1685 goto end;
1686 }
1687
1688 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1689 stream, subbuffer);
1690 if (ret) {
1691 goto end;
1692 }
1693
1694 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1695
1696 ret = get_current_subbuf_addr(stream, &addr);
1697 if (ret) {
1698 goto end;
1699 }
1700
1701 subbuffer->buffer.buffer = lttng_buffer_view_init(
1702 addr, 0, subbuffer->info.data.padded_subbuf_size);
1703 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1704 subbuffer->info.metadata.padded_subbuf_size,
1705 coherent ? "true" : "false");
1706 end:
1707 /*
1708 * The caller only expects -ENODATA when there is no data to read, but
1709 * the kernel tracer returns -EAGAIN when there is currently no data
1710 * for a non-finalized stream, and -ENODATA when there is no data for a
1711 * finalized stream. Those can be combined into a -ENODATA return value.
1712 */
1713 switch (ret) {
1714 case 0:
1715 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1716 break;
1717 case -ENODATA:
1718 case -EAGAIN:
1719 /*
1720 * The caller only expects -ENODATA when there is no data to
1721 * read, but the kernel tracer returns -EAGAIN when there is
1722 * currently no data for a non-finalized stream, and -ENODATA
1723 * when there is no data for a finalized stream. Those can be
1724 * combined into a -ENODATA return value.
1725 */
1726 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1727 break;
1728 default:
1729 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1730 break;
1731 }
1732
1733 return status;
1734 }
1735
1736 static
1737 int put_next_subbuffer(struct lttng_consumer_stream *stream,
1738 struct stream_subbuffer *subbuffer __attribute__((unused)))
1739 {
1740 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1741
1742 if (ret) {
1743 if (ret == -EFAULT) {
1744 PERROR("Error in unreserving sub buffer");
1745 } else if (ret == -EIO) {
1746 /* Should never happen with newer LTTng versions */
1747 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1748 }
1749 }
1750
1751 return ret;
1752 }
1753
1754 static
1755 bool is_get_next_check_metadata_available(int tracer_fd)
1756 {
1757 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1758 const bool available = ret != -ENOTTY;
1759
1760 if (ret == 0) {
1761 /* get succeeded, make sure to put the subbuffer. */
1762 kernctl_put_subbuf(tracer_fd);
1763 }
1764
1765 return available;
1766 }
1767
1768 static
1769 int signal_metadata(struct lttng_consumer_stream *stream,
1770 struct lttng_consumer_local_data *ctx __attribute__((unused)))
1771 {
1772 ASSERT_LOCKED(stream->metadata_rdv_lock);
1773 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1774 }
1775
1776 static
1777 int lttng_kconsumer_set_stream_ops(
1778 struct lttng_consumer_stream *stream)
1779 {
1780 int ret = 0;
1781
1782 if (stream->metadata_flag && stream->chan->is_live) {
1783 DBG("Attempting to enable metadata bucketization for live consumers");
1784 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1785 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1786 stream->read_subbuffer_ops.get_next_subbuffer =
1787 get_next_subbuffer_metadata_check;
1788 ret = consumer_stream_enable_metadata_bucketization(
1789 stream);
1790 if (ret) {
1791 goto end;
1792 }
1793 } else {
1794 /*
1795 * The kernel tracer version is too old to indicate
1796 * when the metadata stream has reached a "coherent"
1797 * (parseable) point.
1798 *
1799 * This means that a live viewer may see an incoherent
1800 * sequence of metadata and fail to parse it.
1801 */
1802 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1803 metadata_bucket_destroy(stream->metadata_bucket);
1804 stream->metadata_bucket = NULL;
1805 }
1806
1807 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1808 }
1809
1810 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1811 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1812 stream->read_subbuffer_ops.get_next_subbuffer =
1813 get_next_subbuffer_mmap;
1814 } else {
1815 stream->read_subbuffer_ops.get_next_subbuffer =
1816 get_next_subbuffer_splice;
1817 }
1818 }
1819
1820 if (stream->metadata_flag) {
1821 stream->read_subbuffer_ops.extract_subbuffer_info =
1822 extract_metadata_subbuffer_info;
1823 } else {
1824 stream->read_subbuffer_ops.extract_subbuffer_info =
1825 extract_data_subbuffer_info;
1826 if (stream->chan->is_live) {
1827 stream->read_subbuffer_ops.send_live_beacon =
1828 consumer_flush_kernel_index;
1829 }
1830 }
1831
1832 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1833 end:
1834 return ret;
1835 }
1836
1837 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1838 {
1839 int ret;
1840
1841 LTTNG_ASSERT(stream);
1842
1843 /*
1844 * Don't create anything if this is set for streaming or if there is
1845 * no current trace chunk on the parent channel.
1846 */
1847 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1848 stream->chan->trace_chunk) {
1849 ret = consumer_stream_create_output_files(stream, true);
1850 if (ret) {
1851 goto error;
1852 }
1853 }
1854
1855 if (stream->output == LTTNG_EVENT_MMAP) {
1856 /* get the len of the mmap region */
1857 unsigned long mmap_len;
1858
1859 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1860 if (ret != 0) {
1861 PERROR("kernctl_get_mmap_len");
1862 goto error_close_fd;
1863 }
1864 stream->mmap_len = (size_t) mmap_len;
1865
1866 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1867 MAP_PRIVATE, stream->wait_fd, 0);
1868 if (stream->mmap_base == MAP_FAILED) {
1869 PERROR("Error mmaping");
1870 ret = -1;
1871 goto error_close_fd;
1872 }
1873 }
1874
1875 ret = lttng_kconsumer_set_stream_ops(stream);
1876 if (ret) {
1877 goto error_close_fd;
1878 }
1879
1880 /* we return 0 to let the library handle the FD internally */
1881 return 0;
1882
1883 error_close_fd:
1884 if (stream->out_fd >= 0) {
1885 int err;
1886
1887 err = close(stream->out_fd);
1888 LTTNG_ASSERT(!err);
1889 stream->out_fd = -1;
1890 }
1891 error:
1892 return ret;
1893 }
1894
1895 /*
1896 * Check if data is still being extracted from the buffers for a specific
1897 * stream. Consumer data lock MUST be acquired before calling this function
1898 * and the stream lock.
1899 *
1900 * Return 1 if the traced data are still getting read else 0 meaning that the
1901 * data is available for trace viewer reading.
1902 */
1903 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1904 {
1905 int ret;
1906
1907 LTTNG_ASSERT(stream);
1908
1909 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1910 ret = 0;
1911 goto end;
1912 }
1913
1914 ret = kernctl_get_next_subbuf(stream->wait_fd);
1915 if (ret == 0) {
1916 /* There is still data so let's put back this subbuffer. */
1917 ret = kernctl_put_subbuf(stream->wait_fd);
1918 LTTNG_ASSERT(ret == 0);
1919 ret = 1; /* Data is pending */
1920 goto end;
1921 }
1922
1923 /* Data is NOT pending and ready to be read. */
1924 ret = 0;
1925
1926 end:
1927 return ret;
1928 }
This page took 0.110525 seconds and 4 git commands to generate.