b0aa99ec1ce25ffc45242b4ffcc0cd6acf0375da
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include "kernel-consumer.hpp"
12
13 #include <common/buffer-view.hpp>
14 #include <common/common.hpp>
15 #include <common/compat/endian.hpp>
16 #include <common/compat/fcntl.hpp>
17 #include <common/consumer/consumer-stream.hpp>
18 #include <common/consumer/consumer-timer.hpp>
19 #include <common/consumer/consumer.hpp>
20 #include <common/consumer/metadata-bucket.hpp>
21 #include <common/index/index.hpp>
22 #include <common/kernel-ctl/kernel-ctl.hpp>
23 #include <common/optional.hpp>
24 #include <common/pipe.hpp>
25 #include <common/relayd/relayd.hpp>
26 #include <common/sessiond-comm/relayd.hpp>
27 #include <common/sessiond-comm/sessiond-comm.hpp>
28 #include <common/urcu.hpp>
29 #include <common/utils.hpp>
30
31 #include <bin/lttng-consumerd/health-consumerd.hpp>
32 #include <inttypes.h>
33 #include <poll.h>
34 #include <pthread.h>
35 #include <stdint.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <sys/mman.h>
39 #include <sys/socket.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <unistd.h>
43
44 extern struct lttng_consumer_global_data the_consumer_data;
45 extern int consumer_poll_timeout;
46
47 /*
48 * Take a snapshot for a specific fd
49 *
50 * Returns 0 on success, < 0 on error
51 */
52 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
53 {
54 int ret = 0;
55 int infd = stream->wait_fd;
56
57 ret = kernctl_snapshot(infd);
58 /*
59 * -EAGAIN is not an error, it just means that there is no data to
60 * be read.
61 */
62 if (ret != 0 && ret != -EAGAIN) {
63 PERROR("Getting sub-buffer snapshot.");
64 }
65
66 return ret;
67 }
68
69 /*
70 * Sample consumed and produced positions for a specific fd.
71 *
72 * Returns 0 on success, < 0 on error.
73 */
74 int lttng_kconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
87 {
88 int ret;
89 int infd = stream->wait_fd;
90
91 ret = kernctl_snapshot_get_produced(infd, pos);
92 if (ret != 0) {
93 PERROR("kernctl_snapshot_get_produced");
94 }
95
96 return ret;
97 }
98
99 /*
100 * Get the consumerd position
101 *
102 * Returns 0 on success, < 0 on error
103 */
104 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
105 {
106 int ret;
107 int infd = stream->wait_fd;
108
109 ret = kernctl_snapshot_get_consumed(infd, pos);
110 if (ret != 0) {
111 PERROR("kernctl_snapshot_get_consumed");
112 }
113
114 return ret;
115 }
116
117 static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
118 {
119 int ret;
120 unsigned long mmap_offset;
121 const char *mmap_base = (const char *) stream->mmap_base;
122
123 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
124 if (ret < 0) {
125 PERROR("Failed to get mmap read offset");
126 goto error;
127 }
128
129 *addr = mmap_base + mmap_offset;
130 error:
131 return ret;
132 }
133
134 /*
135 * Take a snapshot of all the stream of a channel
136 * RCU read-side lock must be held across this function to ensure existence of
137 * channel.
138 *
139 * Returns 0 on success, < 0 on error
140 */
141 static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *channel,
142 uint64_t key,
143 char *path,
144 uint64_t relayd_id,
145 uint64_t nb_packets_per_stream)
146 {
147 int ret;
148 struct lttng_consumer_stream *stream;
149
150 DBG("Kernel consumer snapshot channel %" PRIu64, key);
151
152 /* Prevent channel modifications while we perform the snapshot.*/
153 pthread_mutex_lock(&channel->lock);
154
155 lttng::urcu::read_lock_guard read_lock;
156
157 /* Splice is not supported yet for channel snapshot. */
158 if (channel->output != CONSUMER_CHANNEL_MMAP) {
159 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
160 channel->name);
161 ret = -1;
162 goto end;
163 }
164
165 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
166 unsigned long consumed_pos, produced_pos;
167
168 health_code_update();
169
170 /*
171 * Lock stream because we are about to change its state.
172 */
173 pthread_mutex_lock(&stream->lock);
174
175 LTTNG_ASSERT(channel->trace_chunk);
176 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
177 /*
178 * Can't happen barring an internal error as the channel
179 * holds a reference to the trace chunk.
180 */
181 ERR("Failed to acquire reference to channel's trace chunk");
182 ret = -1;
183 goto end_unlock;
184 }
185 LTTNG_ASSERT(!stream->trace_chunk);
186 stream->trace_chunk = channel->trace_chunk;
187
188 /*
189 * Assign the received relayd ID so we can use it for streaming. The streams
190 * are not visible to anyone so this is OK to change it.
191 */
192 stream->net_seq_idx = relayd_id;
193 channel->relayd_id = relayd_id;
194 if (relayd_id != (uint64_t) -1ULL) {
195 ret = consumer_send_relayd_stream(stream, path);
196 if (ret < 0) {
197 ERR("sending stream to relayd");
198 goto error_close_stream_output;
199 }
200 } else {
201 ret = consumer_stream_create_output_files(stream, false);
202 if (ret < 0) {
203 goto error_close_stream_output;
204 }
205 DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
206 }
207
208 ret = kernctl_buffer_flush_empty(stream->wait_fd);
209 if (ret < 0) {
210 /*
211 * Doing a buffer flush which does not take into
212 * account empty packets. This is not perfect
213 * for stream intersection, but required as a
214 * fall-back when "flush_empty" is not
215 * implemented by lttng-modules.
216 */
217 ret = kernctl_buffer_flush(stream->wait_fd);
218 if (ret < 0) {
219 ERR("Failed to flush kernel stream");
220 goto error_close_stream_output;
221 }
222 goto end_unlock;
223 }
224
225 ret = lttng_kconsumer_take_snapshot(stream);
226 if (ret < 0) {
227 ERR("Taking kernel snapshot");
228 goto error_close_stream_output;
229 }
230
231 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
232 if (ret < 0) {
233 ERR("Produced kernel snapshot position");
234 goto error_close_stream_output;
235 }
236
237 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
238 if (ret < 0) {
239 ERR("Consumerd kernel snapshot position");
240 goto error_close_stream_output;
241 }
242
243 consumed_pos = consumer_get_consume_start_pos(
244 consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
245
246 while ((long) (consumed_pos - produced_pos) < 0) {
247 ssize_t read_len;
248 unsigned long len, padded_len;
249 const char *subbuf_addr;
250 struct lttng_buffer_view subbuf_view;
251
252 health_code_update();
253 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
254
255 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
256 if (ret < 0) {
257 if (ret != -EAGAIN) {
258 PERROR("kernctl_get_subbuf snapshot");
259 goto error_close_stream_output;
260 }
261 DBG("Kernel consumer get subbuf failed. Skipping it.");
262 consumed_pos += stream->max_sb_size;
263 stream->chan->lost_packets++;
264 continue;
265 }
266
267 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
268 if (ret < 0) {
269 ERR("Snapshot kernctl_get_subbuf_size");
270 goto error_put_subbuf;
271 }
272
273 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_padded_subbuf_size");
276 goto error_put_subbuf;
277 }
278
279 ret = get_current_subbuf_addr(stream, &subbuf_addr);
280 if (ret) {
281 goto error_put_subbuf;
282 }
283
284 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
285 read_len = lttng_consumer_on_read_subbuffer_mmap(
286 stream, &subbuf_view, padded_len - len);
287 /*
288 * We write the padded len in local tracefiles but the data len
289 * when using a relay. Display the error but continue processing
290 * to try to release the subbuffer.
291 */
292 if (relayd_id != (uint64_t) -1ULL) {
293 if (read_len != len) {
294 ERR("Error sending to the relay (ret: %zd != len: %lu)",
295 read_len,
296 len);
297 }
298 } else {
299 if (read_len != padded_len) {
300 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
301 read_len,
302 padded_len);
303 }
304 }
305
306 ret = kernctl_put_subbuf(stream->wait_fd);
307 if (ret < 0) {
308 ERR("Snapshot kernctl_put_subbuf");
309 goto error_close_stream_output;
310 }
311 consumed_pos += stream->max_sb_size;
312 }
313
314 consumer_stream_close_output(stream);
315 pthread_mutex_unlock(&stream->lock);
316 }
317
318 /* All good! */
319 ret = 0;
320 goto end;
321
322 error_put_subbuf:
323 ret = kernctl_put_subbuf(stream->wait_fd);
324 if (ret < 0) {
325 ERR("Snapshot kernctl_put_subbuf error path");
326 }
327 error_close_stream_output:
328 consumer_stream_close_output(stream);
329 end_unlock:
330 pthread_mutex_unlock(&stream->lock);
331 end:
332 pthread_mutex_unlock(&channel->lock);
333 return ret;
334 }
335
336 /*
337 * Read the whole metadata available for a snapshot.
338 * RCU read-side lock must be held across this function to ensure existence of
339 * metadata_channel.
340 *
341 * Returns 0 on success, < 0 on error
342 */
343 static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
344 uint64_t key,
345 char *path,
346 uint64_t relayd_id,
347 struct lttng_consumer_local_data *ctx)
348 {
349 int ret, use_relayd = 0;
350 ssize_t ret_read;
351 struct lttng_consumer_stream *metadata_stream;
352
353 LTTNG_ASSERT(ctx);
354
355 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
356
357 lttng::urcu::read_lock_guard read_lock;
358
359 metadata_stream = metadata_channel->metadata_stream;
360 LTTNG_ASSERT(metadata_stream);
361
362 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
363 LTTNG_ASSERT(metadata_channel->trace_chunk);
364 LTTNG_ASSERT(metadata_stream->trace_chunk);
365
366 /* Flag once that we have a valid relayd for the stream. */
367 if (relayd_id != (uint64_t) -1ULL) {
368 use_relayd = 1;
369 }
370
371 if (use_relayd) {
372 ret = consumer_send_relayd_stream(metadata_stream, path);
373 if (ret < 0) {
374 goto error_snapshot;
375 }
376 } else {
377 ret = consumer_stream_create_output_files(metadata_stream, false);
378 if (ret < 0) {
379 goto error_snapshot;
380 }
381 }
382
383 do {
384 health_code_update();
385
386 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
387 if (ret_read < 0) {
388 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read);
389 ret = ret_read;
390 goto error_snapshot;
391 }
392 } while (ret_read > 0);
393
394 if (use_relayd) {
395 close_relayd_stream(metadata_stream);
396 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
397 } else {
398 if (metadata_stream->out_fd >= 0) {
399 ret = close(metadata_stream->out_fd);
400 if (ret < 0) {
401 PERROR("Kernel consumer snapshot metadata close out_fd");
402 /*
403 * Don't go on error here since the snapshot was successful at this
404 * point but somehow the close failed.
405 */
406 }
407 metadata_stream->out_fd = -1;
408 lttng_trace_chunk_put(metadata_stream->trace_chunk);
409 metadata_stream->trace_chunk = nullptr;
410 }
411 }
412
413 ret = 0;
414 error_snapshot:
415 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
416 consumer_stream_destroy(metadata_stream, nullptr);
417 metadata_channel->metadata_stream = nullptr;
418 return ret;
419 }
420
421 /*
422 * Receive command from session daemon and process it.
423 *
424 * Return 1 on success else a negative value or 0.
425 */
426 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
427 int sock,
428 struct pollfd *consumer_sockpoll)
429 {
430 int ret_func;
431 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
432 struct lttcomm_consumer_msg msg;
433
434 health_code_update();
435
436 {
437 ssize_t ret_recv;
438
439 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
440 if (ret_recv != sizeof(msg)) {
441 if (ret_recv > 0) {
442 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
443 ret_recv = -1;
444 }
445 return ret_recv;
446 }
447 }
448
449 health_code_update();
450
451 /* Deprecated command */
452 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
453
454 health_code_update();
455
456 /* relayd needs RCU read-side protection */
457 lttng::urcu::read_lock_guard read_lock;
458
459 switch (msg.cmd_type) {
460 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
461 {
462 uint32_t major = msg.u.relayd_sock.major;
463 uint32_t minor = msg.u.relayd_sock.minor;
464 enum lttcomm_sock_proto protocol =
465 (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
466
467 /* Session daemon status message are handled in the following call. */
468 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
469 msg.u.relayd_sock.type,
470 ctx,
471 sock,
472 consumer_sockpoll,
473 msg.u.relayd_sock.session_id,
474 msg.u.relayd_sock.relayd_session_id,
475 major,
476 minor,
477 protocol);
478 goto end_nosignal;
479 }
480 case LTTNG_CONSUMER_ADD_CHANNEL:
481 {
482 struct lttng_consumer_channel *new_channel;
483 int ret_send_status, ret_add_channel = 0;
484 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
485
486 health_code_update();
487
488 /* First send a status message before receiving the fds. */
489 ret_send_status = consumer_send_status_msg(sock, ret_code);
490 if (ret_send_status < 0) {
491 /* Somehow, the session daemon is not responding anymore. */
492 goto error_fatal;
493 }
494
495 health_code_update();
496
497 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
498 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
499 msg.u.channel.session_id,
500 msg.u.channel.chunk_id.is_set ? &chunk_id :
501 nullptr,
502 msg.u.channel.pathname,
503 msg.u.channel.name,
504 msg.u.channel.relayd_id,
505 msg.u.channel.output,
506 msg.u.channel.tracefile_size,
507 msg.u.channel.tracefile_count,
508 0,
509 msg.u.channel.monitor,
510 msg.u.channel.live_timer_interval,
511 msg.u.channel.is_live,
512 nullptr,
513 nullptr);
514 if (new_channel == nullptr) {
515 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
516 goto end_nosignal;
517 }
518 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
519 switch (msg.u.channel.output) {
520 case LTTNG_EVENT_SPLICE:
521 new_channel->output = CONSUMER_CHANNEL_SPLICE;
522 break;
523 case LTTNG_EVENT_MMAP:
524 new_channel->output = CONSUMER_CHANNEL_MMAP;
525 break;
526 default:
527 ERR("Channel output unknown %d", msg.u.channel.output);
528 goto end_nosignal;
529 }
530
531 /* Translate and save channel type. */
532 switch (msg.u.channel.type) {
533 case CONSUMER_CHANNEL_TYPE_DATA:
534 case CONSUMER_CHANNEL_TYPE_METADATA:
535 new_channel->type = (consumer_channel_type) msg.u.channel.type;
536 break;
537 default:
538 abort();
539 goto end_nosignal;
540 };
541
542 health_code_update();
543
544 if (ctx->on_recv_channel != nullptr) {
545 int ret_recv_channel = ctx->on_recv_channel(new_channel);
546 if (ret_recv_channel == 0) {
547 ret_add_channel = consumer_add_channel(new_channel, ctx);
548 } else if (ret_recv_channel < 0) {
549 goto end_nosignal;
550 }
551 } else {
552 ret_add_channel = consumer_add_channel(new_channel, ctx);
553 }
554 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret_add_channel) {
555 int monitor_start_ret;
556
557 DBG("Consumer starting monitor timer");
558 consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval);
559 monitor_start_ret = consumer_timer_monitor_start(
560 new_channel, msg.u.channel.monitor_timer_interval);
561 if (monitor_start_ret < 0) {
562 ERR("Starting channel monitoring timer failed");
563 goto end_nosignal;
564 }
565 }
566
567 health_code_update();
568
569 /* If we received an error in add_channel, we need to report it. */
570 if (ret_add_channel < 0) {
571 ret_send_status = consumer_send_status_msg(sock, ret_add_channel);
572 if (ret_send_status < 0) {
573 goto error_fatal;
574 }
575 goto end_nosignal;
576 }
577
578 goto end_nosignal;
579 }
580 case LTTNG_CONSUMER_ADD_STREAM:
581 {
582 int fd;
583 struct lttng_pipe *stream_pipe;
584 struct lttng_consumer_stream *new_stream;
585 struct lttng_consumer_channel *channel;
586 int alloc_ret = 0;
587 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
588 ssize_t ret_pipe_write, ret_recv;
589
590 /*
591 * Get stream's channel reference. Needed when adding the stream to the
592 * global hash table.
593 */
594 channel = consumer_find_channel(msg.u.stream.channel_key);
595 if (!channel) {
596 /*
597 * We could not find the channel. Can happen if cpu hotplug
598 * happens while tearing down.
599 */
600 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
601 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
602 }
603
604 health_code_update();
605
606 /* First send a status message before receiving the fds. */
607 ret_send_status = consumer_send_status_msg(sock, ret_code);
608 if (ret_send_status < 0) {
609 /* Somehow, the session daemon is not responding anymore. */
610 goto error_add_stream_fatal;
611 }
612
613 health_code_update();
614
615 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
616 /* Channel was not found. */
617 goto error_add_stream_nosignal;
618 }
619
620 /* Blocking call */
621 health_poll_entry();
622 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
623 health_poll_exit();
624 if (ret_poll) {
625 goto error_add_stream_fatal;
626 }
627
628 health_code_update();
629
630 /* Get stream file descriptor from socket */
631 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
632 if (ret_recv != sizeof(fd)) {
633 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
634 ret_func = ret_recv;
635 goto end;
636 }
637
638 health_code_update();
639
640 /*
641 * Send status code to session daemon only if the recv works. If the
642 * above recv() failed, the session daemon is notified through the
643 * error socket and the teardown is eventually done.
644 */
645 ret_send_status = consumer_send_status_msg(sock, ret_code);
646 if (ret_send_status < 0) {
647 /* Somehow, the session daemon is not responding anymore. */
648 goto error_add_stream_nosignal;
649 }
650
651 health_code_update();
652
653 pthread_mutex_lock(&channel->lock);
654 new_stream = consumer_stream_create(channel,
655 channel->key,
656 fd,
657 channel->name,
658 channel->relayd_id,
659 channel->session_id,
660 channel->trace_chunk,
661 msg.u.stream.cpu,
662 &alloc_ret,
663 channel->type,
664 channel->monitor);
665 if (new_stream == nullptr) {
666 switch (alloc_ret) {
667 case -ENOMEM:
668 case -EINVAL:
669 default:
670 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
671 break;
672 }
673 pthread_mutex_unlock(&channel->lock);
674 goto error_add_stream_nosignal;
675 }
676
677 new_stream->wait_fd = fd;
678 ret_get_max_subbuf_size =
679 kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size);
680 if (ret_get_max_subbuf_size < 0) {
681 pthread_mutex_unlock(&channel->lock);
682 ERR("Failed to get kernel maximal subbuffer size");
683 goto error_add_stream_nosignal;
684 }
685
686 consumer_stream_update_channel_attributes(new_stream, channel);
687
688 /*
689 * We've just assigned the channel to the stream so increment the
690 * refcount right now. We don't need to increment the refcount for
691 * streams in no monitor because we handle manually the cleanup of
692 * those. It is very important to make sure there is NO prior
693 * consumer_del_stream() calls or else the refcount will be unbalanced.
694 */
695 if (channel->monitor) {
696 uatomic_inc(&new_stream->chan->refcount);
697 }
698
699 /*
700 * The buffer flush is done on the session daemon side for the kernel
701 * so no need for the stream "hangup_flush_done" variable to be
702 * tracked. This is important for a kernel stream since we don't rely
703 * on the flush state of the stream to read data. It's not the case for
704 * user space tracing.
705 */
706 new_stream->hangup_flush_done = 0;
707
708 health_code_update();
709
710 pthread_mutex_lock(&new_stream->lock);
711 if (ctx->on_recv_stream) {
712 int ret_recv_stream = ctx->on_recv_stream(new_stream);
713 if (ret_recv_stream < 0) {
714 pthread_mutex_unlock(&new_stream->lock);
715 pthread_mutex_unlock(&channel->lock);
716 consumer_stream_free(new_stream);
717 goto error_add_stream_nosignal;
718 }
719 }
720 health_code_update();
721
722 if (new_stream->metadata_flag) {
723 channel->metadata_stream = new_stream;
724 }
725
726 /* Do not monitor this stream. */
727 if (!channel->monitor) {
728 DBG("Kernel consumer add stream %s in no monitor mode with "
729 "relayd id %" PRIu64,
730 new_stream->name,
731 new_stream->net_seq_idx);
732 cds_list_add(&new_stream->send_node, &channel->streams.head);
733 pthread_mutex_unlock(&new_stream->lock);
734 pthread_mutex_unlock(&channel->lock);
735 goto end_add_stream;
736 }
737
738 /* Send stream to relayd if the stream has an ID. */
739 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
740 int ret_send_relayd_stream;
741
742 ret_send_relayd_stream =
743 consumer_send_relayd_stream(new_stream, new_stream->chan->pathname);
744 if (ret_send_relayd_stream < 0) {
745 pthread_mutex_unlock(&new_stream->lock);
746 pthread_mutex_unlock(&channel->lock);
747 consumer_stream_free(new_stream);
748 goto error_add_stream_nosignal;
749 }
750
751 /*
752 * If adding an extra stream to an already
753 * existing channel (e.g. cpu hotplug), we need
754 * to send the "streams_sent" command to relayd.
755 */
756 if (channel->streams_sent_to_relayd) {
757 int ret_send_relayd_streams_sent;
758
759 ret_send_relayd_streams_sent =
760 consumer_send_relayd_streams_sent(new_stream->net_seq_idx);
761 if (ret_send_relayd_streams_sent < 0) {
762 pthread_mutex_unlock(&new_stream->lock);
763 pthread_mutex_unlock(&channel->lock);
764 goto error_add_stream_nosignal;
765 }
766 }
767 }
768 pthread_mutex_unlock(&new_stream->lock);
769 pthread_mutex_unlock(&channel->lock);
770
771 /* Get the right pipe where the stream will be sent. */
772 if (new_stream->metadata_flag) {
773 consumer_add_metadata_stream(new_stream);
774 stream_pipe = ctx->consumer_metadata_pipe;
775 } else {
776 consumer_add_data_stream(new_stream);
777 stream_pipe = ctx->consumer_data_pipe;
778 }
779
780 /* Visible to other threads */
781 new_stream->globally_visible = 1;
782
783 health_code_update();
784
785 ret_pipe_write =
786 lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); /* NOLINT
787 sizeof
788 used on a
789 pointer.
790 */
791 if (ret_pipe_write < 0) {
792 ERR("Consumer write %s stream to pipe %d",
793 new_stream->metadata_flag ? "metadata" : "data",
794 lttng_pipe_get_writefd(stream_pipe));
795 if (new_stream->metadata_flag) {
796 consumer_del_stream_for_metadata(new_stream);
797 } else {
798 consumer_del_stream_for_data(new_stream);
799 }
800 goto error_add_stream_nosignal;
801 }
802
803 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
804 new_stream->name,
805 fd,
806 new_stream->chan->pathname,
807 new_stream->relayd_stream_id);
808 end_add_stream:
809 break;
810 error_add_stream_nosignal:
811 goto end_nosignal;
812 error_add_stream_fatal:
813 goto error_fatal;
814 }
815 case LTTNG_CONSUMER_STREAMS_SENT:
816 {
817 struct lttng_consumer_channel *channel;
818 int ret_send_status;
819
820 /*
821 * Get stream's channel reference. Needed when adding the stream to the
822 * global hash table.
823 */
824 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
825 if (!channel) {
826 /*
827 * We could not find the channel. Can happen if cpu hotplug
828 * happens while tearing down.
829 */
830 ERR("Unable to find channel key %" PRIu64, msg.u.sent_streams.channel_key);
831 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
832 }
833
834 health_code_update();
835
836 /*
837 * Send status code to session daemon.
838 */
839 ret_send_status = consumer_send_status_msg(sock, ret_code);
840 if (ret_send_status < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
841 /* Somehow, the session daemon is not responding anymore. */
842 goto error_streams_sent_nosignal;
843 }
844
845 health_code_update();
846
847 /*
848 * We should not send this message if we don't monitor the
849 * streams in this channel.
850 */
851 if (!channel->monitor) {
852 goto end_error_streams_sent;
853 }
854
855 health_code_update();
856 /* Send stream to relayd if the stream has an ID. */
857 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
858 int ret_send_relay_streams;
859
860 ret_send_relay_streams =
861 consumer_send_relayd_streams_sent(msg.u.sent_streams.net_seq_idx);
862 if (ret_send_relay_streams < 0) {
863 goto error_streams_sent_nosignal;
864 }
865 channel->streams_sent_to_relayd = true;
866 }
867 end_error_streams_sent:
868 break;
869 error_streams_sent_nosignal:
870 goto end_nosignal;
871 }
872 case LTTNG_CONSUMER_UPDATE_STREAM:
873 {
874 return -ENOSYS;
875 }
876 case LTTNG_CONSUMER_DESTROY_RELAYD:
877 {
878 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
879 struct consumer_relayd_sock_pair *relayd;
880 int ret_send_status;
881
882 DBG("Kernel consumer destroying relayd %" PRIu64, index);
883
884 /* Get relayd reference if exists. */
885 relayd = consumer_find_relayd(index);
886 if (relayd == nullptr) {
887 DBG("Unable to find relayd %" PRIu64, index);
888 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
889 }
890
891 /*
892 * Each relayd socket pair has a refcount of stream attached to it
893 * which tells if the relayd is still active or not depending on the
894 * refcount value.
895 *
896 * This will set the destroy flag of the relayd object and destroy it
897 * if the refcount reaches zero when called.
898 *
899 * The destroy can happen either here or when a stream fd hangs up.
900 */
901 if (relayd) {
902 consumer_flag_relayd_for_destroy(relayd);
903 }
904
905 health_code_update();
906
907 ret_send_status = consumer_send_status_msg(sock, ret_code);
908 if (ret_send_status < 0) {
909 /* Somehow, the session daemon is not responding anymore. */
910 goto error_fatal;
911 }
912
913 goto end_nosignal;
914 }
915 case LTTNG_CONSUMER_DATA_PENDING:
916 {
917 int32_t ret_data_pending;
918 uint64_t id = msg.u.data_pending.session_id;
919 ssize_t ret_send;
920
921 DBG("Kernel consumer data pending command for id %" PRIu64, id);
922
923 ret_data_pending = consumer_data_pending(id);
924
925 health_code_update();
926
927 /* Send back returned value to session daemon */
928 ret_send =
929 lttcomm_send_unix_sock(sock, &ret_data_pending, sizeof(ret_data_pending));
930 if (ret_send < 0) {
931 PERROR("send data pending ret code");
932 goto error_fatal;
933 }
934
935 /*
936 * No need to send back a status message since the data pending
937 * returned value is the response.
938 */
939 break;
940 }
941 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
942 {
943 struct lttng_consumer_channel *channel;
944 uint64_t key = msg.u.snapshot_channel.key;
945 int ret_send_status;
946
947 channel = consumer_find_channel(key);
948 if (!channel) {
949 ERR("Channel %" PRIu64 " not found", key);
950 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
951 } else {
952 if (msg.u.snapshot_channel.metadata == 1) {
953 int ret_snapshot;
954
955 ret_snapshot = lttng_kconsumer_snapshot_metadata(
956 channel,
957 key,
958 msg.u.snapshot_channel.pathname,
959 msg.u.snapshot_channel.relayd_id,
960 ctx);
961 if (ret_snapshot < 0) {
962 ERR("Snapshot metadata failed");
963 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
964 }
965 } else {
966 int ret_snapshot;
967
968 ret_snapshot = lttng_kconsumer_snapshot_channel(
969 channel,
970 key,
971 msg.u.snapshot_channel.pathname,
972 msg.u.snapshot_channel.relayd_id,
973 msg.u.snapshot_channel.nb_packets_per_stream);
974 if (ret_snapshot < 0) {
975 ERR("Snapshot channel failed");
976 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
977 }
978 }
979 }
980 health_code_update();
981
982 ret_send_status = consumer_send_status_msg(sock, ret_code);
983 if (ret_send_status < 0) {
984 /* Somehow, the session daemon is not responding anymore. */
985 goto end_nosignal;
986 }
987 break;
988 }
989 case LTTNG_CONSUMER_DESTROY_CHANNEL:
990 {
991 uint64_t key = msg.u.destroy_channel.key;
992 struct lttng_consumer_channel *channel;
993 int ret_send_status;
994
995 channel = consumer_find_channel(key);
996 if (!channel) {
997 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
998 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
999 }
1000
1001 health_code_update();
1002
1003 ret_send_status = consumer_send_status_msg(sock, ret_code);
1004 if (ret_send_status < 0) {
1005 /* Somehow, the session daemon is not responding anymore. */
1006 goto end_destroy_channel;
1007 }
1008
1009 health_code_update();
1010
1011 /* Stop right now if no channel was found. */
1012 if (!channel) {
1013 goto end_destroy_channel;
1014 }
1015
1016 /*
1017 * This command should ONLY be issued for channel with streams set in
1018 * no monitor mode.
1019 */
1020 LTTNG_ASSERT(!channel->monitor);
1021
1022 /*
1023 * The refcount should ALWAYS be 0 in the case of a channel in no
1024 * monitor mode.
1025 */
1026 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1027
1028 consumer_del_channel(channel);
1029 end_destroy_channel:
1030 goto end_nosignal;
1031 }
1032 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1033 {
1034 ssize_t ret;
1035 uint64_t count;
1036 struct lttng_consumer_channel *channel;
1037 uint64_t id = msg.u.discarded_events.session_id;
1038 uint64_t key = msg.u.discarded_events.channel_key;
1039
1040 DBG("Kernel consumer discarded events command for session id %" PRIu64
1041 ", channel key %" PRIu64,
1042 id,
1043 key);
1044
1045 channel = consumer_find_channel(key);
1046 if (!channel) {
1047 ERR("Kernel consumer discarded events channel %" PRIu64 " not found", key);
1048 count = 0;
1049 } else {
1050 count = channel->discarded_events;
1051 }
1052
1053 health_code_update();
1054
1055 /* Send back returned value to session daemon */
1056 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1057 if (ret < 0) {
1058 PERROR("send discarded events");
1059 goto error_fatal;
1060 }
1061
1062 break;
1063 }
1064 case LTTNG_CONSUMER_LOST_PACKETS:
1065 {
1066 ssize_t ret;
1067 uint64_t count;
1068 struct lttng_consumer_channel *channel;
1069 uint64_t id = msg.u.lost_packets.session_id;
1070 uint64_t key = msg.u.lost_packets.channel_key;
1071
1072 DBG("Kernel consumer lost packets command for session id %" PRIu64
1073 ", channel key %" PRIu64,
1074 id,
1075 key);
1076
1077 channel = consumer_find_channel(key);
1078 if (!channel) {
1079 ERR("Kernel consumer lost packets channel %" PRIu64 " not found", key);
1080 count = 0;
1081 } else {
1082 count = channel->lost_packets;
1083 }
1084
1085 health_code_update();
1086
1087 /* Send back returned value to session daemon */
1088 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1089 if (ret < 0) {
1090 PERROR("send lost packets");
1091 goto error_fatal;
1092 }
1093
1094 break;
1095 }
1096 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1097 {
1098 int channel_monitor_pipe;
1099 int ret_send_status, ret_set_channel_monitor_pipe;
1100 ssize_t ret_recv;
1101
1102 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1103 /* Successfully received the command's type. */
1104 ret_send_status = consumer_send_status_msg(sock, ret_code);
1105 if (ret_send_status < 0) {
1106 goto error_fatal;
1107 }
1108
1109 ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
1110 if (ret_recv != sizeof(channel_monitor_pipe)) {
1111 ERR("Failed to receive channel monitor pipe");
1112 goto error_fatal;
1113 }
1114
1115 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1116 ret_set_channel_monitor_pipe =
1117 consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
1118 if (!ret_set_channel_monitor_pipe) {
1119 int flags;
1120 int ret_fcntl;
1121
1122 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1123 /* Set the pipe as non-blocking. */
1124 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1125 if (ret_fcntl == -1) {
1126 PERROR("fcntl get flags of the channel monitoring pipe");
1127 goto error_fatal;
1128 }
1129 flags = ret_fcntl;
1130
1131 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
1132 if (ret_fcntl == -1) {
1133 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1134 goto error_fatal;
1135 }
1136 DBG("Channel monitor pipe set as non-blocking");
1137 } else {
1138 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1139 }
1140 ret_send_status = consumer_send_status_msg(sock, ret_code);
1141 if (ret_send_status < 0) {
1142 goto error_fatal;
1143 }
1144 break;
1145 }
1146 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1147 {
1148 struct lttng_consumer_channel *channel;
1149 uint64_t key = msg.u.rotate_channel.key;
1150 int ret_send_status;
1151
1152 DBG("Consumer rotate channel %" PRIu64, key);
1153
1154 channel = consumer_find_channel(key);
1155 if (!channel) {
1156 ERR("Channel %" PRIu64 " not found", key);
1157 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1158 } else {
1159 /*
1160 * Sample the rotate position of all the streams in this channel.
1161 */
1162 int ret_rotate_channel;
1163
1164 ret_rotate_channel = lttng_consumer_rotate_channel(
1165 channel, key, msg.u.rotate_channel.relayd_id);
1166 if (ret_rotate_channel < 0) {
1167 ERR("Rotate channel failed");
1168 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1169 }
1170
1171 health_code_update();
1172 }
1173
1174 ret_send_status = consumer_send_status_msg(sock, ret_code);
1175 if (ret_send_status < 0) {
1176 /* Somehow, the session daemon is not responding anymore. */
1177 goto error_rotate_channel;
1178 }
1179 if (channel) {
1180 /* Rotate the streams that are ready right now. */
1181 int ret_rotate;
1182
1183 ret_rotate = lttng_consumer_rotate_ready_streams(channel, key);
1184 if (ret_rotate < 0) {
1185 ERR("Rotate ready streams failed");
1186 }
1187 }
1188 break;
1189 error_rotate_channel:
1190 goto end_nosignal;
1191 }
1192 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1193 {
1194 struct lttng_consumer_channel *channel;
1195 uint64_t key = msg.u.clear_channel.key;
1196 int ret_send_status;
1197
1198 channel = consumer_find_channel(key);
1199 if (!channel) {
1200 DBG("Channel %" PRIu64 " not found", key);
1201 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1202 } else {
1203 int ret_clear_channel;
1204
1205 ret_clear_channel = lttng_consumer_clear_channel(channel);
1206 if (ret_clear_channel) {
1207 ERR("Clear channel failed");
1208 ret_code = (lttcomm_return_code) ret_clear_channel;
1209 }
1210
1211 health_code_update();
1212 }
1213
1214 ret_send_status = consumer_send_status_msg(sock, ret_code);
1215 if (ret_send_status < 0) {
1216 /* Somehow, the session daemon is not responding anymore. */
1217 goto end_nosignal;
1218 }
1219
1220 break;
1221 }
1222 case LTTNG_CONSUMER_INIT:
1223 {
1224 int ret_send_status;
1225 lttng_uuid sessiond_uuid;
1226
1227 std::copy(std::begin(msg.u.init.sessiond_uuid),
1228 std::end(msg.u.init.sessiond_uuid),
1229 sessiond_uuid.begin());
1230
1231 ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
1232 health_code_update();
1233 ret_send_status = consumer_send_status_msg(sock, ret_code);
1234 if (ret_send_status < 0) {
1235 /* Somehow, the session daemon is not responding anymore. */
1236 goto end_nosignal;
1237 }
1238 break;
1239 }
1240 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1241 {
1242 const struct lttng_credentials credentials = {
1243 .uid = LTTNG_OPTIONAL_INIT_VALUE(
1244 msg.u.create_trace_chunk.credentials.value.uid),
1245 .gid = LTTNG_OPTIONAL_INIT_VALUE(
1246 msg.u.create_trace_chunk.credentials.value.gid),
1247 };
1248 const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
1249 const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
1250 const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
1251 msg.u.create_trace_chunk.override_name :
1252 nullptr;
1253 struct lttng_directory_handle *chunk_directory_handle = nullptr;
1254
1255 /*
1256 * The session daemon will only provide a chunk directory file
1257 * descriptor for local traces.
1258 */
1259 if (is_local_trace) {
1260 int chunk_dirfd;
1261 int ret_send_status;
1262 ssize_t ret_recv;
1263
1264 /* Acnowledge the reception of the command. */
1265 ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
1266 if (ret_send_status < 0) {
1267 /* Somehow, the session daemon is not responding anymore. */
1268 goto end_nosignal;
1269 }
1270
1271 ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
1272 if (ret_recv != sizeof(chunk_dirfd)) {
1273 ERR("Failed to receive trace chunk directory file descriptor");
1274 goto error_fatal;
1275 }
1276
1277 DBG("Received trace chunk directory fd (%d)", chunk_dirfd);
1278 chunk_directory_handle =
1279 lttng_directory_handle_create_from_dirfd(chunk_dirfd);
1280 if (!chunk_directory_handle) {
1281 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1282 if (close(chunk_dirfd)) {
1283 PERROR("Failed to close chunk directory file descriptor");
1284 }
1285 goto error_fatal;
1286 }
1287 }
1288
1289 ret_code = lttng_consumer_create_trace_chunk(
1290 !is_local_trace ? &relayd_id : nullptr,
1291 msg.u.create_trace_chunk.session_id,
1292 msg.u.create_trace_chunk.chunk_id,
1293 (time_t) msg.u.create_trace_chunk.creation_timestamp,
1294 chunk_override_name,
1295 msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr,
1296 chunk_directory_handle);
1297 lttng_directory_handle_put(chunk_directory_handle);
1298 goto end_msg_sessiond;
1299 }
1300 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1301 {
1302 enum lttng_trace_chunk_command_type close_command =
1303 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1304 const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
1305 struct lttcomm_consumer_close_trace_chunk_reply reply;
1306 char path[LTTNG_PATH_MAX];
1307 ssize_t ret_send;
1308
1309 ret_code = lttng_consumer_close_trace_chunk(
1310 msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr,
1311 msg.u.close_trace_chunk.session_id,
1312 msg.u.close_trace_chunk.chunk_id,
1313 (time_t) msg.u.close_trace_chunk.close_timestamp,
1314 msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr,
1315 path);
1316 reply.ret_code = ret_code;
1317 reply.path_length = strlen(path) + 1;
1318 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1319 if (ret_send != sizeof(reply)) {
1320 goto error_fatal;
1321 }
1322 ret_send = lttcomm_send_unix_sock(sock, path, reply.path_length);
1323 if (ret_send != reply.path_length) {
1324 goto error_fatal;
1325 }
1326 goto end_nosignal;
1327 }
1328 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1329 {
1330 const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
1331
1332 ret_code = lttng_consumer_trace_chunk_exists(
1333 msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr,
1334 msg.u.trace_chunk_exists.session_id,
1335 msg.u.trace_chunk_exists.chunk_id);
1336 goto end_msg_sessiond;
1337 }
1338 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1339 {
1340 const uint64_t key = msg.u.open_channel_packets.key;
1341 struct lttng_consumer_channel *channel = consumer_find_channel(key);
1342
1343 if (channel) {
1344 pthread_mutex_lock(&channel->lock);
1345 ret_code = lttng_consumer_open_channel_packets(channel);
1346 pthread_mutex_unlock(&channel->lock);
1347 } else {
1348 WARN("Channel %" PRIu64 " not found", key);
1349 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1350 }
1351
1352 health_code_update();
1353 goto end_msg_sessiond;
1354 }
1355 default:
1356 goto end_nosignal;
1357 }
1358
1359 end_nosignal:
1360 /*
1361 * Return 1 to indicate success since the 0 value can be a socket
1362 * shutdown during the recv() or send() call.
1363 */
1364 ret_func = 1;
1365 goto end;
1366 error_fatal:
1367 /* This will issue a consumer stop. */
1368 ret_func = -1;
1369 goto end;
1370 end_msg_sessiond:
1371 /*
1372 * The returned value here is not useful since either way we'll return 1 to
1373 * the caller because the session daemon socket management is done
1374 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1375 */
1376 {
1377 int ret_send_status;
1378
1379 ret_send_status = consumer_send_status_msg(sock, ret_code);
1380 if (ret_send_status < 0) {
1381 goto error_fatal;
1382 }
1383 }
1384
1385 ret_func = 1;
1386
1387 end:
1388 health_code_update();
1389 return ret_func;
1390 }
1391
1392 /*
1393 * Sync metadata meaning request them to the session daemon and snapshot to the
1394 * metadata thread can consumer them.
1395 *
1396 * Metadata stream lock MUST be acquired.
1397 */
1398 enum sync_metadata_status lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
1399 {
1400 int ret;
1401 enum sync_metadata_status status;
1402
1403 LTTNG_ASSERT(metadata);
1404
1405 ret = kernctl_buffer_flush(metadata->wait_fd);
1406 if (ret < 0) {
1407 ERR("Failed to flush kernel stream");
1408 status = SYNC_METADATA_STATUS_ERROR;
1409 goto end;
1410 }
1411
1412 ret = kernctl_snapshot(metadata->wait_fd);
1413 if (ret < 0) {
1414 if (errno == EAGAIN) {
1415 /* No new metadata, exit. */
1416 DBG("Sync metadata, no new kernel metadata");
1417 status = SYNC_METADATA_STATUS_NO_DATA;
1418 } else {
1419 ERR("Sync metadata, taking kernel snapshot failed.");
1420 status = SYNC_METADATA_STATUS_ERROR;
1421 }
1422 } else {
1423 status = SYNC_METADATA_STATUS_NEW_DATA;
1424 }
1425
1426 end:
1427 return status;
1428 }
1429
1430 static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1431 struct stream_subbuffer *subbuf)
1432 {
1433 int ret;
1434
1435 ret = kernctl_get_subbuf_size(stream->wait_fd, &subbuf->info.data.subbuf_size);
1436 if (ret) {
1437 goto end;
1438 }
1439
1440 ret = kernctl_get_padded_subbuf_size(stream->wait_fd,
1441 &subbuf->info.data.padded_subbuf_size);
1442 if (ret) {
1443 goto end;
1444 }
1445
1446 end:
1447 return ret;
1448 }
1449
1450 static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1451 struct stream_subbuffer *subbuf)
1452 {
1453 int ret;
1454
1455 ret = extract_common_subbuffer_info(stream, subbuf);
1456 if (ret) {
1457 goto end;
1458 }
1459
1460 ret = kernctl_get_metadata_version(stream->wait_fd, &subbuf->info.metadata.version);
1461 if (ret) {
1462 goto end;
1463 }
1464
1465 end:
1466 return ret;
1467 }
1468
1469 static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1470 struct stream_subbuffer *subbuf)
1471 {
1472 int ret;
1473
1474 ret = extract_common_subbuffer_info(stream, subbuf);
1475 if (ret) {
1476 goto end;
1477 }
1478
1479 ret = kernctl_get_packet_size(stream->wait_fd, &subbuf->info.data.packet_size);
1480 if (ret < 0) {
1481 PERROR("Failed to get sub-buffer packet size");
1482 goto end;
1483 }
1484
1485 ret = kernctl_get_content_size(stream->wait_fd, &subbuf->info.data.content_size);
1486 if (ret < 0) {
1487 PERROR("Failed to get sub-buffer content size");
1488 goto end;
1489 }
1490
1491 ret = kernctl_get_timestamp_begin(stream->wait_fd, &subbuf->info.data.timestamp_begin);
1492 if (ret < 0) {
1493 PERROR("Failed to get sub-buffer begin timestamp");
1494 goto end;
1495 }
1496
1497 ret = kernctl_get_timestamp_end(stream->wait_fd, &subbuf->info.data.timestamp_end);
1498 if (ret < 0) {
1499 PERROR("Failed to get sub-buffer end timestamp");
1500 goto end;
1501 }
1502
1503 ret = kernctl_get_events_discarded(stream->wait_fd, &subbuf->info.data.events_discarded);
1504 if (ret) {
1505 PERROR("Failed to get sub-buffer events discarded count");
1506 goto end;
1507 }
1508
1509 ret = kernctl_get_sequence_number(stream->wait_fd,
1510 &subbuf->info.data.sequence_number.value);
1511 if (ret) {
1512 /* May not be supported by older LTTng-modules. */
1513 if (ret != -ENOTTY) {
1514 PERROR("Failed to get sub-buffer sequence number");
1515 goto end;
1516 }
1517 } else {
1518 subbuf->info.data.sequence_number.is_set = true;
1519 }
1520
1521 ret = kernctl_get_stream_id(stream->wait_fd, &subbuf->info.data.stream_id);
1522 if (ret < 0) {
1523 PERROR("Failed to get stream id");
1524 goto end;
1525 }
1526
1527 ret = kernctl_get_instance_id(stream->wait_fd, &subbuf->info.data.stream_instance_id.value);
1528 if (ret) {
1529 /* May not be supported by older LTTng-modules. */
1530 if (ret != -ENOTTY) {
1531 PERROR("Failed to get stream instance id");
1532 goto end;
1533 }
1534 } else {
1535 subbuf->info.data.stream_instance_id.is_set = true;
1536 }
1537 end:
1538 return ret;
1539 }
1540
1541 static enum get_next_subbuffer_status get_subbuffer_common(struct lttng_consumer_stream *stream,
1542 struct stream_subbuffer *subbuffer)
1543 {
1544 int ret;
1545 enum get_next_subbuffer_status status;
1546
1547 ret = kernctl_get_next_subbuf(stream->wait_fd);
1548 switch (ret) {
1549 case 0:
1550 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1551 break;
1552 case -ENODATA:
1553 case -EAGAIN:
1554 /*
1555 * The caller only expects -ENODATA when there is no data to
1556 * read, but the kernel tracer returns -EAGAIN when there is
1557 * currently no data for a non-finalized stream, and -ENODATA
1558 * when there is no data for a finalized stream. Those can be
1559 * combined into a -ENODATA return value.
1560 */
1561 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1562 goto end;
1563 default:
1564 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1565 goto end;
1566 }
1567
1568 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
1569 if (ret) {
1570 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1571 }
1572 end:
1573 return status;
1574 }
1575
1576 static enum get_next_subbuffer_status
1577 get_next_subbuffer_splice(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
1578 {
1579 const enum get_next_subbuffer_status status = get_subbuffer_common(stream, subbuffer);
1580
1581 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1582 goto end;
1583 }
1584
1585 subbuffer->buffer.fd = stream->wait_fd;
1586 end:
1587 return status;
1588 }
1589
1590 static enum get_next_subbuffer_status get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
1591 struct stream_subbuffer *subbuffer)
1592 {
1593 int ret;
1594 enum get_next_subbuffer_status status;
1595 const char *addr;
1596
1597 status = get_subbuffer_common(stream, subbuffer);
1598 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1599 goto end;
1600 }
1601
1602 ret = get_current_subbuf_addr(stream, &addr);
1603 if (ret) {
1604 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1605 goto end;
1606 }
1607
1608 subbuffer->buffer.buffer =
1609 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
1610 end:
1611 return status;
1612 }
1613
1614 static enum get_next_subbuffer_status
1615 get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1616 struct stream_subbuffer *subbuffer)
1617 {
1618 int ret;
1619 const char *addr;
1620 bool coherent;
1621 enum get_next_subbuffer_status status;
1622
1623 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, &coherent);
1624 if (ret) {
1625 goto end;
1626 }
1627
1628 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
1629 if (ret) {
1630 goto end;
1631 }
1632
1633 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1634
1635 ret = get_current_subbuf_addr(stream, &addr);
1636 if (ret) {
1637 goto end;
1638 }
1639
1640 subbuffer->buffer.buffer =
1641 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
1642 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1643 subbuffer->info.metadata.padded_subbuf_size,
1644 coherent ? "true" : "false");
1645 end:
1646 /*
1647 * The caller only expects -ENODATA when there is no data to read, but
1648 * the kernel tracer returns -EAGAIN when there is currently no data
1649 * for a non-finalized stream, and -ENODATA when there is no data for a
1650 * finalized stream. Those can be combined into a -ENODATA return value.
1651 */
1652 switch (ret) {
1653 case 0:
1654 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1655 break;
1656 case -ENODATA:
1657 case -EAGAIN:
1658 /*
1659 * The caller only expects -ENODATA when there is no data to
1660 * read, but the kernel tracer returns -EAGAIN when there is
1661 * currently no data for a non-finalized stream, and -ENODATA
1662 * when there is no data for a finalized stream. Those can be
1663 * combined into a -ENODATA return value.
1664 */
1665 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1666 break;
1667 default:
1668 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1669 break;
1670 }
1671
1672 return status;
1673 }
1674
1675 static int put_next_subbuffer(struct lttng_consumer_stream *stream,
1676 struct stream_subbuffer *subbuffer __attribute__((unused)))
1677 {
1678 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1679
1680 if (ret) {
1681 if (ret == -EFAULT) {
1682 PERROR("Error in unreserving sub buffer");
1683 } else if (ret == -EIO) {
1684 /* Should never happen with newer LTTng versions */
1685 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1686 }
1687 }
1688
1689 return ret;
1690 }
1691
1692 static bool is_get_next_check_metadata_available(int tracer_fd)
1693 {
1694 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, nullptr);
1695 const bool available = ret != -ENOTTY;
1696
1697 if (ret == 0) {
1698 /* get succeeded, make sure to put the subbuffer. */
1699 kernctl_put_subbuf(tracer_fd);
1700 }
1701
1702 return available;
1703 }
1704
1705 static int signal_metadata(struct lttng_consumer_stream *stream,
1706 struct lttng_consumer_local_data *ctx __attribute__((unused)))
1707 {
1708 ASSERT_LOCKED(stream->metadata_rdv_lock);
1709 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1710 }
1711
1712 static int lttng_kconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
1713 {
1714 int ret = 0;
1715
1716 if (stream->metadata_flag && stream->chan->is_live) {
1717 DBG("Attempting to enable metadata bucketization for live consumers");
1718 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1719 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1720 stream->read_subbuffer_ops.get_next_subbuffer =
1721 get_next_subbuffer_metadata_check;
1722 ret = consumer_stream_enable_metadata_bucketization(stream);
1723 if (ret) {
1724 goto end;
1725 }
1726 } else {
1727 /*
1728 * The kernel tracer version is too old to indicate
1729 * when the metadata stream has reached a "coherent"
1730 * (parseable) point.
1731 *
1732 * This means that a live viewer may see an incoherent
1733 * sequence of metadata and fail to parse it.
1734 */
1735 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1736 metadata_bucket_destroy(stream->metadata_bucket);
1737 stream->metadata_bucket = nullptr;
1738 }
1739
1740 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1741 }
1742
1743 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1744 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1745 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_mmap;
1746 } else {
1747 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_splice;
1748 }
1749 }
1750
1751 if (stream->metadata_flag) {
1752 stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
1753 } else {
1754 stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
1755 if (stream->chan->is_live) {
1756 stream->read_subbuffer_ops.send_live_beacon = consumer_flush_kernel_index;
1757 }
1758 }
1759
1760 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1761 end:
1762 return ret;
1763 }
1764
1765 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1766 {
1767 int ret;
1768
1769 LTTNG_ASSERT(stream);
1770
1771 /*
1772 * Don't create anything if this is set for streaming or if there is
1773 * no current trace chunk on the parent channel.
1774 */
1775 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1776 stream->chan->trace_chunk) {
1777 ret = consumer_stream_create_output_files(stream, true);
1778 if (ret) {
1779 goto error;
1780 }
1781 }
1782
1783 if (stream->output == LTTNG_EVENT_MMAP) {
1784 /* get the len of the mmap region */
1785 unsigned long mmap_len;
1786
1787 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1788 if (ret != 0) {
1789 PERROR("kernctl_get_mmap_len");
1790 goto error_close_fd;
1791 }
1792 stream->mmap_len = (size_t) mmap_len;
1793
1794 stream->mmap_base =
1795 mmap(nullptr, stream->mmap_len, PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
1796 if (stream->mmap_base == MAP_FAILED) {
1797 PERROR("Error mmaping");
1798 ret = -1;
1799 goto error_close_fd;
1800 }
1801 }
1802
1803 ret = lttng_kconsumer_set_stream_ops(stream);
1804 if (ret) {
1805 goto error_close_fd;
1806 }
1807
1808 /* we return 0 to let the library handle the FD internally */
1809 return 0;
1810
1811 error_close_fd:
1812 if (stream->out_fd >= 0) {
1813 int err;
1814
1815 err = close(stream->out_fd);
1816 LTTNG_ASSERT(!err);
1817 stream->out_fd = -1;
1818 }
1819 error:
1820 return ret;
1821 }
1822
1823 /*
1824 * Check if data is still being extracted from the buffers for a specific
1825 * stream. Consumer data lock MUST be acquired before calling this function
1826 * and the stream lock.
1827 *
1828 * Return 1 if the traced data are still getting read else 0 meaning that the
1829 * data is available for trace viewer reading.
1830 */
1831 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1832 {
1833 int ret;
1834
1835 LTTNG_ASSERT(stream);
1836
1837 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1838 ret = 0;
1839 goto end;
1840 }
1841
1842 ret = kernctl_get_next_subbuf(stream->wait_fd);
1843 if (ret == 0) {
1844 /* There is still data so let's put back this subbuffer. */
1845 ret = kernctl_put_subbuf(stream->wait_fd);
1846 LTTNG_ASSERT(ret == 0);
1847 ret = 1; /* Data is pending */
1848 goto end;
1849 }
1850
1851 /* Data is NOT pending and ready to be read. */
1852 ret = 0;
1853
1854 end:
1855 return ret;
1856 }
This page took 0.085702 seconds and 4 git commands to generate.