Rename C++ header files to .hpp
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <poll.h>
12 #include <pthread.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/mman.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <inttypes.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21 #include <stdint.h>
22
23 #include <bin/lttng-consumerd/health-consumerd.hpp>
24 #include <common/common.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/sessiond-comm/sessiond-comm.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/compat/fcntl.hpp>
29 #include <common/compat/endian.hpp>
30 #include <common/pipe.hpp>
31 #include <common/relayd/relayd.hpp>
32 #include <common/utils.hpp>
33 #include <common/consumer/consumer-stream.hpp>
34 #include <common/index/index.hpp>
35 #include <common/consumer/consumer-timer.hpp>
36 #include <common/optional.hpp>
37 #include <common/buffer-view.hpp>
38 #include <common/consumer/consumer.hpp>
39 #include <common/consumer/metadata-bucket.hpp>
40
41 #include "kernel-consumer.hpp"
42
43 extern struct lttng_consumer_global_data the_consumer_data;
44 extern int consumer_poll_timeout;
45
46 /*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
51 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
62 PERROR("Getting sub-buffer snapshot.");
63 }
64
65 return ret;
66 }
67
68 /*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73 int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
87 unsigned long *pos)
88 {
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
94 PERROR("kernctl_snapshot_get_produced");
95 }
96
97 return ret;
98 }
99
100 /*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107 {
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
113 PERROR("kernctl_snapshot_get_consumed");
114 }
115
116 return ret;
117 }
118
119 static
120 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122 {
123 int ret;
124 unsigned long mmap_offset;
125 const char *mmap_base = (const char *) stream->mmap_base;
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134 error:
135 return ret;
136 }
137
138 /*
139 * Take a snapshot of all the stream of a channel
140 * RCU read-side lock must be held across this function to ensure existence of
141 * channel.
142 *
143 * Returns 0 on success, < 0 on error
144 */
145 static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream)
149 {
150 int ret;
151 struct lttng_consumer_stream *stream;
152
153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
154
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
158 rcu_read_lock();
159
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
164 ret = -1;
165 goto end;
166 }
167
168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
169 unsigned long consumed_pos, produced_pos;
170
171 health_code_update();
172
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
178 LTTNG_ASSERT(channel->trace_chunk);
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
188 LTTNG_ASSERT(!stream->trace_chunk);
189 stream->trace_chunk = channel->trace_chunk;
190
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
198 ret = consumer_send_relayd_stream(stream, path);
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto end_unlock;
202 }
203 } else {
204 ret = consumer_stream_create_output_files(stream,
205 false);
206 if (ret < 0) {
207 goto end_unlock;
208 }
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
211 }
212
213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
214 if (ret < 0) {
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto end_unlock;
226 }
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto end_unlock;
246 }
247
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
251
252 while ((long) (consumed_pos - produced_pos) < 0) {
253 ssize_t read_len;
254 unsigned long len, padded_len;
255 const char *subbuf_addr;
256 struct lttng_buffer_view subbuf_view;
257
258 health_code_update();
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
263 if (ret != -EAGAIN) {
264 PERROR("kernctl_get_subbuf snapshot");
265 goto end_unlock;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
269 stream->chan->lost_packets++;
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
276 goto error_put_subbuf;
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
282 goto error_put_subbuf;
283 }
284
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
292 read_len = lttng_consumer_on_read_subbuffer_mmap(
293 stream, &subbuf_view,
294 padded_len - len);
295 /*
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto end_unlock;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 if (relayd_id == (uint64_t) -1ULL) {
321 if (stream->out_fd >= 0) {
322 ret = close(stream->out_fd);
323 if (ret < 0) {
324 PERROR("Kernel consumer snapshot close out_fd");
325 goto end_unlock;
326 }
327 stream->out_fd = -1;
328 }
329 } else {
330 close_relayd_stream(stream);
331 stream->net_seq_idx = (uint64_t) -1ULL;
332 }
333 lttng_trace_chunk_put(stream->trace_chunk);
334 stream->trace_chunk = NULL;
335 pthread_mutex_unlock(&stream->lock);
336 }
337
338 /* All good! */
339 ret = 0;
340 goto end;
341
342 error_put_subbuf:
343 ret = kernctl_put_subbuf(stream->wait_fd);
344 if (ret < 0) {
345 ERR("Snapshot kernctl_put_subbuf error path");
346 }
347 end_unlock:
348 pthread_mutex_unlock(&stream->lock);
349 end:
350 rcu_read_unlock();
351 pthread_mutex_unlock(&channel->lock);
352 return ret;
353 }
354
355 /*
356 * Read the whole metadata available for a snapshot.
357 * RCU read-side lock must be held across this function to ensure existence of
358 * metadata_channel.
359 *
360 * Returns 0 on success, < 0 on error
361 */
362 static int lttng_kconsumer_snapshot_metadata(
363 struct lttng_consumer_channel *metadata_channel,
364 uint64_t key, char *path, uint64_t relayd_id,
365 struct lttng_consumer_local_data *ctx)
366 {
367 int ret, use_relayd = 0;
368 ssize_t ret_read;
369 struct lttng_consumer_stream *metadata_stream;
370
371 LTTNG_ASSERT(ctx);
372
373 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
374 key, path);
375
376 rcu_read_lock();
377
378 metadata_stream = metadata_channel->metadata_stream;
379 LTTNG_ASSERT(metadata_stream);
380
381 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
382 LTTNG_ASSERT(metadata_channel->trace_chunk);
383 LTTNG_ASSERT(metadata_stream->trace_chunk);
384
385 /* Flag once that we have a valid relayd for the stream. */
386 if (relayd_id != (uint64_t) -1ULL) {
387 use_relayd = 1;
388 }
389
390 if (use_relayd) {
391 ret = consumer_send_relayd_stream(metadata_stream, path);
392 if (ret < 0) {
393 goto error_snapshot;
394 }
395 } else {
396 ret = consumer_stream_create_output_files(metadata_stream,
397 false);
398 if (ret < 0) {
399 goto error_snapshot;
400 }
401 }
402
403 do {
404 health_code_update();
405
406 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
407 if (ret_read < 0) {
408 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
409 ret_read);
410 ret = ret_read;
411 goto error_snapshot;
412 }
413 } while (ret_read > 0);
414
415 if (use_relayd) {
416 close_relayd_stream(metadata_stream);
417 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
418 } else {
419 if (metadata_stream->out_fd >= 0) {
420 ret = close(metadata_stream->out_fd);
421 if (ret < 0) {
422 PERROR("Kernel consumer snapshot metadata close out_fd");
423 /*
424 * Don't go on error here since the snapshot was successful at this
425 * point but somehow the close failed.
426 */
427 }
428 metadata_stream->out_fd = -1;
429 lttng_trace_chunk_put(metadata_stream->trace_chunk);
430 metadata_stream->trace_chunk = NULL;
431 }
432 }
433
434 ret = 0;
435 error_snapshot:
436 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
437 consumer_stream_destroy(metadata_stream, NULL);
438 metadata_channel->metadata_stream = NULL;
439 rcu_read_unlock();
440 return ret;
441 }
442
443 /*
444 * Receive command from session daemon and process it.
445 *
446 * Return 1 on success else a negative value or 0.
447 */
448 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
449 int sock, struct pollfd *consumer_sockpoll)
450 {
451 int ret_func;
452 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
453 struct lttcomm_consumer_msg msg;
454
455 health_code_update();
456
457 {
458 ssize_t ret_recv;
459
460 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
461 if (ret_recv != sizeof(msg)) {
462 if (ret_recv > 0) {
463 lttng_consumer_send_error(ctx,
464 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
465 ret_recv = -1;
466 }
467 return ret_recv;
468 }
469 }
470
471 health_code_update();
472
473 /* Deprecated command */
474 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
475
476 health_code_update();
477
478 /* relayd needs RCU read-side protection */
479 rcu_read_lock();
480
481 switch (msg.cmd_type) {
482 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
483 {
484 uint32_t major = msg.u.relayd_sock.major;
485 uint32_t minor = msg.u.relayd_sock.minor;
486 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
487 msg.u.relayd_sock.relayd_socket_protocol;
488
489 /* Session daemon status message are handled in the following call. */
490 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
491 msg.u.relayd_sock.type, ctx, sock,
492 consumer_sockpoll, msg.u.relayd_sock.session_id,
493 msg.u.relayd_sock.relayd_session_id, major,
494 minor, protocol);
495 goto end_nosignal;
496 }
497 case LTTNG_CONSUMER_ADD_CHANNEL:
498 {
499 struct lttng_consumer_channel *new_channel;
500 int ret_send_status, ret_add_channel = 0;
501 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
502
503 health_code_update();
504
505 /* First send a status message before receiving the fds. */
506 ret_send_status = consumer_send_status_msg(sock, ret_code);
507 if (ret_send_status < 0) {
508 /* Somehow, the session daemon is not responding anymore. */
509 goto error_fatal;
510 }
511
512 health_code_update();
513
514 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
515 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
516 msg.u.channel.session_id,
517 msg.u.channel.chunk_id.is_set ?
518 &chunk_id : NULL,
519 msg.u.channel.pathname,
520 msg.u.channel.name,
521 msg.u.channel.relayd_id, msg.u.channel.output,
522 msg.u.channel.tracefile_size,
523 msg.u.channel.tracefile_count, 0,
524 msg.u.channel.monitor,
525 msg.u.channel.live_timer_interval,
526 msg.u.channel.is_live,
527 NULL, NULL);
528 if (new_channel == NULL) {
529 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
530 goto end_nosignal;
531 }
532 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
533 switch (msg.u.channel.output) {
534 case LTTNG_EVENT_SPLICE:
535 new_channel->output = CONSUMER_CHANNEL_SPLICE;
536 break;
537 case LTTNG_EVENT_MMAP:
538 new_channel->output = CONSUMER_CHANNEL_MMAP;
539 break;
540 default:
541 ERR("Channel output unknown %d", msg.u.channel.output);
542 goto end_nosignal;
543 }
544
545 /* Translate and save channel type. */
546 switch (msg.u.channel.type) {
547 case CONSUMER_CHANNEL_TYPE_DATA:
548 case CONSUMER_CHANNEL_TYPE_METADATA:
549 new_channel->type = (consumer_channel_type) msg.u.channel.type;
550 break;
551 default:
552 abort();
553 goto end_nosignal;
554 };
555
556 health_code_update();
557
558 if (ctx->on_recv_channel != NULL) {
559 int ret_recv_channel =
560 ctx->on_recv_channel(new_channel);
561 if (ret_recv_channel == 0) {
562 ret_add_channel = consumer_add_channel(
563 new_channel, ctx);
564 } else if (ret_recv_channel < 0) {
565 goto end_nosignal;
566 }
567 } else {
568 ret_add_channel =
569 consumer_add_channel(new_channel, ctx);
570 }
571 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
572 !ret_add_channel) {
573 int monitor_start_ret;
574
575 DBG("Consumer starting monitor timer");
576 consumer_timer_live_start(new_channel,
577 msg.u.channel.live_timer_interval);
578 monitor_start_ret = consumer_timer_monitor_start(
579 new_channel,
580 msg.u.channel.monitor_timer_interval);
581 if (monitor_start_ret < 0) {
582 ERR("Starting channel monitoring timer failed");
583 goto end_nosignal;
584 }
585 }
586
587 health_code_update();
588
589 /* If we received an error in add_channel, we need to report it. */
590 if (ret_add_channel < 0) {
591 ret_send_status = consumer_send_status_msg(
592 sock, ret_add_channel);
593 if (ret_send_status < 0) {
594 goto error_fatal;
595 }
596 goto end_nosignal;
597 }
598
599 goto end_nosignal;
600 }
601 case LTTNG_CONSUMER_ADD_STREAM:
602 {
603 int fd;
604 struct lttng_pipe *stream_pipe;
605 struct lttng_consumer_stream *new_stream;
606 struct lttng_consumer_channel *channel;
607 int alloc_ret = 0;
608 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
609 ssize_t ret_pipe_write, ret_recv;
610
611 /*
612 * Get stream's channel reference. Needed when adding the stream to the
613 * global hash table.
614 */
615 channel = consumer_find_channel(msg.u.stream.channel_key);
616 if (!channel) {
617 /*
618 * We could not find the channel. Can happen if cpu hotplug
619 * happens while tearing down.
620 */
621 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
622 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
623 }
624
625 health_code_update();
626
627 /* First send a status message before receiving the fds. */
628 ret_send_status = consumer_send_status_msg(sock, ret_code);
629 if (ret_send_status < 0) {
630 /* Somehow, the session daemon is not responding anymore. */
631 goto error_add_stream_fatal;
632 }
633
634 health_code_update();
635
636 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
637 /* Channel was not found. */
638 goto error_add_stream_nosignal;
639 }
640
641 /* Blocking call */
642 health_poll_entry();
643 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
644 health_poll_exit();
645 if (ret_poll) {
646 goto error_add_stream_fatal;
647 }
648
649 health_code_update();
650
651 /* Get stream file descriptor from socket */
652 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
653 if (ret_recv != sizeof(fd)) {
654 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
655 ret_func = ret_recv;
656 goto end;
657 }
658
659 health_code_update();
660
661 /*
662 * Send status code to session daemon only if the recv works. If the
663 * above recv() failed, the session daemon is notified through the
664 * error socket and the teardown is eventually done.
665 */
666 ret_send_status = consumer_send_status_msg(sock, ret_code);
667 if (ret_send_status < 0) {
668 /* Somehow, the session daemon is not responding anymore. */
669 goto error_add_stream_nosignal;
670 }
671
672 health_code_update();
673
674 pthread_mutex_lock(&channel->lock);
675 new_stream = consumer_stream_create(
676 channel,
677 channel->key,
678 fd,
679 channel->name,
680 channel->relayd_id,
681 channel->session_id,
682 channel->trace_chunk,
683 msg.u.stream.cpu,
684 &alloc_ret,
685 channel->type,
686 channel->monitor);
687 if (new_stream == NULL) {
688 switch (alloc_ret) {
689 case -ENOMEM:
690 case -EINVAL:
691 default:
692 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
693 break;
694 }
695 pthread_mutex_unlock(&channel->lock);
696 goto error_add_stream_nosignal;
697 }
698
699 new_stream->wait_fd = fd;
700 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
701 new_stream->wait_fd, &new_stream->max_sb_size);
702 if (ret_get_max_subbuf_size < 0) {
703 pthread_mutex_unlock(&channel->lock);
704 ERR("Failed to get kernel maximal subbuffer size");
705 goto error_add_stream_nosignal;
706 }
707
708 consumer_stream_update_channel_attributes(new_stream,
709 channel);
710
711 /*
712 * We've just assigned the channel to the stream so increment the
713 * refcount right now. We don't need to increment the refcount for
714 * streams in no monitor because we handle manually the cleanup of
715 * those. It is very important to make sure there is NO prior
716 * consumer_del_stream() calls or else the refcount will be unbalanced.
717 */
718 if (channel->monitor) {
719 uatomic_inc(&new_stream->chan->refcount);
720 }
721
722 /*
723 * The buffer flush is done on the session daemon side for the kernel
724 * so no need for the stream "hangup_flush_done" variable to be
725 * tracked. This is important for a kernel stream since we don't rely
726 * on the flush state of the stream to read data. It's not the case for
727 * user space tracing.
728 */
729 new_stream->hangup_flush_done = 0;
730
731 health_code_update();
732
733 pthread_mutex_lock(&new_stream->lock);
734 if (ctx->on_recv_stream) {
735 int ret_recv_stream = ctx->on_recv_stream(new_stream);
736 if (ret_recv_stream < 0) {
737 pthread_mutex_unlock(&new_stream->lock);
738 pthread_mutex_unlock(&channel->lock);
739 consumer_stream_free(new_stream);
740 goto error_add_stream_nosignal;
741 }
742 }
743 health_code_update();
744
745 if (new_stream->metadata_flag) {
746 channel->metadata_stream = new_stream;
747 }
748
749 /* Do not monitor this stream. */
750 if (!channel->monitor) {
751 DBG("Kernel consumer add stream %s in no monitor mode with "
752 "relayd id %" PRIu64, new_stream->name,
753 new_stream->net_seq_idx);
754 cds_list_add(&new_stream->send_node, &channel->streams.head);
755 pthread_mutex_unlock(&new_stream->lock);
756 pthread_mutex_unlock(&channel->lock);
757 goto end_add_stream;
758 }
759
760 /* Send stream to relayd if the stream has an ID. */
761 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
762 int ret_send_relayd_stream;
763
764 ret_send_relayd_stream = consumer_send_relayd_stream(
765 new_stream, new_stream->chan->pathname);
766 if (ret_send_relayd_stream < 0) {
767 pthread_mutex_unlock(&new_stream->lock);
768 pthread_mutex_unlock(&channel->lock);
769 consumer_stream_free(new_stream);
770 goto error_add_stream_nosignal;
771 }
772
773 /*
774 * If adding an extra stream to an already
775 * existing channel (e.g. cpu hotplug), we need
776 * to send the "streams_sent" command to relayd.
777 */
778 if (channel->streams_sent_to_relayd) {
779 int ret_send_relayd_streams_sent;
780
781 ret_send_relayd_streams_sent =
782 consumer_send_relayd_streams_sent(
783 new_stream->net_seq_idx);
784 if (ret_send_relayd_streams_sent < 0) {
785 pthread_mutex_unlock(&new_stream->lock);
786 pthread_mutex_unlock(&channel->lock);
787 goto error_add_stream_nosignal;
788 }
789 }
790 }
791 pthread_mutex_unlock(&new_stream->lock);
792 pthread_mutex_unlock(&channel->lock);
793
794 /* Get the right pipe where the stream will be sent. */
795 if (new_stream->metadata_flag) {
796 consumer_add_metadata_stream(new_stream);
797 stream_pipe = ctx->consumer_metadata_pipe;
798 } else {
799 consumer_add_data_stream(new_stream);
800 stream_pipe = ctx->consumer_data_pipe;
801 }
802
803 /* Visible to other threads */
804 new_stream->globally_visible = 1;
805
806 health_code_update();
807
808 ret_pipe_write = lttng_pipe_write(
809 stream_pipe, &new_stream, sizeof(new_stream));
810 if (ret_pipe_write < 0) {
811 ERR("Consumer write %s stream to pipe %d",
812 new_stream->metadata_flag ? "metadata" : "data",
813 lttng_pipe_get_writefd(stream_pipe));
814 if (new_stream->metadata_flag) {
815 consumer_del_stream_for_metadata(new_stream);
816 } else {
817 consumer_del_stream_for_data(new_stream);
818 }
819 goto error_add_stream_nosignal;
820 }
821
822 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
823 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
824 end_add_stream:
825 break;
826 error_add_stream_nosignal:
827 goto end_nosignal;
828 error_add_stream_fatal:
829 goto error_fatal;
830 }
831 case LTTNG_CONSUMER_STREAMS_SENT:
832 {
833 struct lttng_consumer_channel *channel;
834 int ret_send_status;
835
836 /*
837 * Get stream's channel reference. Needed when adding the stream to the
838 * global hash table.
839 */
840 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
841 if (!channel) {
842 /*
843 * We could not find the channel. Can happen if cpu hotplug
844 * happens while tearing down.
845 */
846 ERR("Unable to find channel key %" PRIu64,
847 msg.u.sent_streams.channel_key);
848 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
849 }
850
851 health_code_update();
852
853 /*
854 * Send status code to session daemon.
855 */
856 ret_send_status = consumer_send_status_msg(sock, ret_code);
857 if (ret_send_status < 0 ||
858 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
859 /* Somehow, the session daemon is not responding anymore. */
860 goto error_streams_sent_nosignal;
861 }
862
863 health_code_update();
864
865 /*
866 * We should not send this message if we don't monitor the
867 * streams in this channel.
868 */
869 if (!channel->monitor) {
870 goto end_error_streams_sent;
871 }
872
873 health_code_update();
874 /* Send stream to relayd if the stream has an ID. */
875 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
876 int ret_send_relay_streams;
877
878 ret_send_relay_streams = consumer_send_relayd_streams_sent(
879 msg.u.sent_streams.net_seq_idx);
880 if (ret_send_relay_streams < 0) {
881 goto error_streams_sent_nosignal;
882 }
883 channel->streams_sent_to_relayd = true;
884 }
885 end_error_streams_sent:
886 break;
887 error_streams_sent_nosignal:
888 goto end_nosignal;
889 }
890 case LTTNG_CONSUMER_UPDATE_STREAM:
891 {
892 rcu_read_unlock();
893 return -ENOSYS;
894 }
895 case LTTNG_CONSUMER_DESTROY_RELAYD:
896 {
897 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
898 struct consumer_relayd_sock_pair *relayd;
899 int ret_send_status;
900
901 DBG("Kernel consumer destroying relayd %" PRIu64, index);
902
903 /* Get relayd reference if exists. */
904 relayd = consumer_find_relayd(index);
905 if (relayd == NULL) {
906 DBG("Unable to find relayd %" PRIu64, index);
907 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
908 }
909
910 /*
911 * Each relayd socket pair has a refcount of stream attached to it
912 * which tells if the relayd is still active or not depending on the
913 * refcount value.
914 *
915 * This will set the destroy flag of the relayd object and destroy it
916 * if the refcount reaches zero when called.
917 *
918 * The destroy can happen either here or when a stream fd hangs up.
919 */
920 if (relayd) {
921 consumer_flag_relayd_for_destroy(relayd);
922 }
923
924 health_code_update();
925
926 ret_send_status = consumer_send_status_msg(sock, ret_code);
927 if (ret_send_status < 0) {
928 /* Somehow, the session daemon is not responding anymore. */
929 goto error_fatal;
930 }
931
932 goto end_nosignal;
933 }
934 case LTTNG_CONSUMER_DATA_PENDING:
935 {
936 int32_t ret_data_pending;
937 uint64_t id = msg.u.data_pending.session_id;
938 ssize_t ret_send;
939
940 DBG("Kernel consumer data pending command for id %" PRIu64, id);
941
942 ret_data_pending = consumer_data_pending(id);
943
944 health_code_update();
945
946 /* Send back returned value to session daemon */
947 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
948 sizeof(ret_data_pending));
949 if (ret_send < 0) {
950 PERROR("send data pending ret code");
951 goto error_fatal;
952 }
953
954 /*
955 * No need to send back a status message since the data pending
956 * returned value is the response.
957 */
958 break;
959 }
960 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
961 {
962 struct lttng_consumer_channel *channel;
963 uint64_t key = msg.u.snapshot_channel.key;
964 int ret_send_status;
965
966 channel = consumer_find_channel(key);
967 if (!channel) {
968 ERR("Channel %" PRIu64 " not found", key);
969 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
970 } else {
971 if (msg.u.snapshot_channel.metadata == 1) {
972 int ret_snapshot;
973
974 ret_snapshot = lttng_kconsumer_snapshot_metadata(
975 channel, key,
976 msg.u.snapshot_channel.pathname,
977 msg.u.snapshot_channel.relayd_id,
978 ctx);
979 if (ret_snapshot < 0) {
980 ERR("Snapshot metadata failed");
981 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
982 }
983 } else {
984 int ret_snapshot;
985
986 ret_snapshot = lttng_kconsumer_snapshot_channel(
987 channel, key,
988 msg.u.snapshot_channel.pathname,
989 msg.u.snapshot_channel.relayd_id,
990 msg.u.snapshot_channel
991 .nb_packets_per_stream);
992 if (ret_snapshot < 0) {
993 ERR("Snapshot channel failed");
994 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
995 }
996 }
997 }
998 health_code_update();
999
1000 ret_send_status = consumer_send_status_msg(sock, ret_code);
1001 if (ret_send_status < 0) {
1002 /* Somehow, the session daemon is not responding anymore. */
1003 goto end_nosignal;
1004 }
1005 break;
1006 }
1007 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1008 {
1009 uint64_t key = msg.u.destroy_channel.key;
1010 struct lttng_consumer_channel *channel;
1011 int ret_send_status;
1012
1013 channel = consumer_find_channel(key);
1014 if (!channel) {
1015 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1016 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1017 }
1018
1019 health_code_update();
1020
1021 ret_send_status = consumer_send_status_msg(sock, ret_code);
1022 if (ret_send_status < 0) {
1023 /* Somehow, the session daemon is not responding anymore. */
1024 goto end_destroy_channel;
1025 }
1026
1027 health_code_update();
1028
1029 /* Stop right now if no channel was found. */
1030 if (!channel) {
1031 goto end_destroy_channel;
1032 }
1033
1034 /*
1035 * This command should ONLY be issued for channel with streams set in
1036 * no monitor mode.
1037 */
1038 LTTNG_ASSERT(!channel->monitor);
1039
1040 /*
1041 * The refcount should ALWAYS be 0 in the case of a channel in no
1042 * monitor mode.
1043 */
1044 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1045
1046 consumer_del_channel(channel);
1047 end_destroy_channel:
1048 goto end_nosignal;
1049 }
1050 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1051 {
1052 ssize_t ret;
1053 uint64_t count;
1054 struct lttng_consumer_channel *channel;
1055 uint64_t id = msg.u.discarded_events.session_id;
1056 uint64_t key = msg.u.discarded_events.channel_key;
1057
1058 DBG("Kernel consumer discarded events command for session id %"
1059 PRIu64 ", channel key %" PRIu64, id, key);
1060
1061 channel = consumer_find_channel(key);
1062 if (!channel) {
1063 ERR("Kernel consumer discarded events channel %"
1064 PRIu64 " not found", key);
1065 count = 0;
1066 } else {
1067 count = channel->discarded_events;
1068 }
1069
1070 health_code_update();
1071
1072 /* Send back returned value to session daemon */
1073 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1074 if (ret < 0) {
1075 PERROR("send discarded events");
1076 goto error_fatal;
1077 }
1078
1079 break;
1080 }
1081 case LTTNG_CONSUMER_LOST_PACKETS:
1082 {
1083 ssize_t ret;
1084 uint64_t count;
1085 struct lttng_consumer_channel *channel;
1086 uint64_t id = msg.u.lost_packets.session_id;
1087 uint64_t key = msg.u.lost_packets.channel_key;
1088
1089 DBG("Kernel consumer lost packets command for session id %"
1090 PRIu64 ", channel key %" PRIu64, id, key);
1091
1092 channel = consumer_find_channel(key);
1093 if (!channel) {
1094 ERR("Kernel consumer lost packets channel %"
1095 PRIu64 " not found", key);
1096 count = 0;
1097 } else {
1098 count = channel->lost_packets;
1099 }
1100
1101 health_code_update();
1102
1103 /* Send back returned value to session daemon */
1104 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1105 if (ret < 0) {
1106 PERROR("send lost packets");
1107 goto error_fatal;
1108 }
1109
1110 break;
1111 }
1112 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1113 {
1114 int channel_monitor_pipe;
1115 int ret_send_status, ret_set_channel_monitor_pipe;
1116 ssize_t ret_recv;
1117
1118 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1119 /* Successfully received the command's type. */
1120 ret_send_status = consumer_send_status_msg(sock, ret_code);
1121 if (ret_send_status < 0) {
1122 goto error_fatal;
1123 }
1124
1125 ret_recv = lttcomm_recv_fds_unix_sock(
1126 sock, &channel_monitor_pipe, 1);
1127 if (ret_recv != sizeof(channel_monitor_pipe)) {
1128 ERR("Failed to receive channel monitor pipe");
1129 goto error_fatal;
1130 }
1131
1132 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1133 ret_set_channel_monitor_pipe =
1134 consumer_timer_thread_set_channel_monitor_pipe(
1135 channel_monitor_pipe);
1136 if (!ret_set_channel_monitor_pipe) {
1137 int flags;
1138 int ret_fcntl;
1139
1140 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1141 /* Set the pipe as non-blocking. */
1142 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1143 if (ret_fcntl == -1) {
1144 PERROR("fcntl get flags of the channel monitoring pipe");
1145 goto error_fatal;
1146 }
1147 flags = ret_fcntl;
1148
1149 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
1150 flags | O_NONBLOCK);
1151 if (ret_fcntl == -1) {
1152 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1153 goto error_fatal;
1154 }
1155 DBG("Channel monitor pipe set as non-blocking");
1156 } else {
1157 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1158 }
1159 ret_send_status = consumer_send_status_msg(sock, ret_code);
1160 if (ret_send_status < 0) {
1161 goto error_fatal;
1162 }
1163 break;
1164 }
1165 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1166 {
1167 struct lttng_consumer_channel *channel;
1168 uint64_t key = msg.u.rotate_channel.key;
1169 int ret_send_status;
1170
1171 DBG("Consumer rotate channel %" PRIu64, key);
1172
1173 channel = consumer_find_channel(key);
1174 if (!channel) {
1175 ERR("Channel %" PRIu64 " not found", key);
1176 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1177 } else {
1178 /*
1179 * Sample the rotate position of all the streams in this channel.
1180 */
1181 int ret_rotate_channel;
1182
1183 ret_rotate_channel = lttng_consumer_rotate_channel(
1184 channel, key,
1185 msg.u.rotate_channel.relayd_id);
1186 if (ret_rotate_channel < 0) {
1187 ERR("Rotate channel failed");
1188 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1189 }
1190
1191 health_code_update();
1192 }
1193
1194 ret_send_status = consumer_send_status_msg(sock, ret_code);
1195 if (ret_send_status < 0) {
1196 /* Somehow, the session daemon is not responding anymore. */
1197 goto error_rotate_channel;
1198 }
1199 if (channel) {
1200 /* Rotate the streams that are ready right now. */
1201 int ret_rotate;
1202
1203 ret_rotate = lttng_consumer_rotate_ready_streams(
1204 channel, key);
1205 if (ret_rotate < 0) {
1206 ERR("Rotate ready streams failed");
1207 }
1208 }
1209 break;
1210 error_rotate_channel:
1211 goto end_nosignal;
1212 }
1213 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1214 {
1215 struct lttng_consumer_channel *channel;
1216 uint64_t key = msg.u.clear_channel.key;
1217 int ret_send_status;
1218
1219 channel = consumer_find_channel(key);
1220 if (!channel) {
1221 DBG("Channel %" PRIu64 " not found", key);
1222 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1223 } else {
1224 int ret_clear_channel;
1225
1226 ret_clear_channel =
1227 lttng_consumer_clear_channel(channel);
1228 if (ret_clear_channel) {
1229 ERR("Clear channel failed");
1230 ret_code = (lttcomm_return_code) ret_clear_channel;
1231 }
1232
1233 health_code_update();
1234 }
1235
1236 ret_send_status = consumer_send_status_msg(sock, ret_code);
1237 if (ret_send_status < 0) {
1238 /* Somehow, the session daemon is not responding anymore. */
1239 goto end_nosignal;
1240 }
1241
1242 break;
1243 }
1244 case LTTNG_CONSUMER_INIT:
1245 {
1246 int ret_send_status;
1247
1248 ret_code = lttng_consumer_init_command(ctx,
1249 msg.u.init.sessiond_uuid);
1250 health_code_update();
1251 ret_send_status = consumer_send_status_msg(sock, ret_code);
1252 if (ret_send_status < 0) {
1253 /* Somehow, the session daemon is not responding anymore. */
1254 goto end_nosignal;
1255 }
1256 break;
1257 }
1258 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1259 {
1260 const struct lttng_credentials credentials = {
1261 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1262 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
1263 };
1264 const bool is_local_trace =
1265 !msg.u.create_trace_chunk.relayd_id.is_set;
1266 const uint64_t relayd_id =
1267 msg.u.create_trace_chunk.relayd_id.value;
1268 const char *chunk_override_name =
1269 *msg.u.create_trace_chunk.override_name ?
1270 msg.u.create_trace_chunk.override_name :
1271 NULL;
1272 struct lttng_directory_handle *chunk_directory_handle = NULL;
1273
1274 /*
1275 * The session daemon will only provide a chunk directory file
1276 * descriptor for local traces.
1277 */
1278 if (is_local_trace) {
1279 int chunk_dirfd;
1280 int ret_send_status;
1281 ssize_t ret_recv;
1282
1283 /* Acnowledge the reception of the command. */
1284 ret_send_status = consumer_send_status_msg(
1285 sock, LTTCOMM_CONSUMERD_SUCCESS);
1286 if (ret_send_status < 0) {
1287 /* Somehow, the session daemon is not responding anymore. */
1288 goto end_nosignal;
1289 }
1290
1291 ret_recv = lttcomm_recv_fds_unix_sock(
1292 sock, &chunk_dirfd, 1);
1293 if (ret_recv != sizeof(chunk_dirfd)) {
1294 ERR("Failed to receive trace chunk directory file descriptor");
1295 goto error_fatal;
1296 }
1297
1298 DBG("Received trace chunk directory fd (%d)",
1299 chunk_dirfd);
1300 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
1301 chunk_dirfd);
1302 if (!chunk_directory_handle) {
1303 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1304 if (close(chunk_dirfd)) {
1305 PERROR("Failed to close chunk directory file descriptor");
1306 }
1307 goto error_fatal;
1308 }
1309 }
1310
1311 ret_code = lttng_consumer_create_trace_chunk(
1312 !is_local_trace ? &relayd_id : NULL,
1313 msg.u.create_trace_chunk.session_id,
1314 msg.u.create_trace_chunk.chunk_id,
1315 (time_t) msg.u.create_trace_chunk
1316 .creation_timestamp,
1317 chunk_override_name,
1318 msg.u.create_trace_chunk.credentials.is_set ?
1319 &credentials :
1320 NULL,
1321 chunk_directory_handle);
1322 lttng_directory_handle_put(chunk_directory_handle);
1323 goto end_msg_sessiond;
1324 }
1325 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1326 {
1327 enum lttng_trace_chunk_command_type close_command =
1328 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1329 const uint64_t relayd_id =
1330 msg.u.close_trace_chunk.relayd_id.value;
1331 struct lttcomm_consumer_close_trace_chunk_reply reply;
1332 char path[LTTNG_PATH_MAX];
1333 ssize_t ret_send;
1334
1335 ret_code = lttng_consumer_close_trace_chunk(
1336 msg.u.close_trace_chunk.relayd_id.is_set ?
1337 &relayd_id :
1338 NULL,
1339 msg.u.close_trace_chunk.session_id,
1340 msg.u.close_trace_chunk.chunk_id,
1341 (time_t) msg.u.close_trace_chunk.close_timestamp,
1342 msg.u.close_trace_chunk.close_command.is_set ?
1343 &close_command :
1344 NULL, path);
1345 reply.ret_code = ret_code;
1346 reply.path_length = strlen(path) + 1;
1347 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1348 if (ret_send != sizeof(reply)) {
1349 goto error_fatal;
1350 }
1351 ret_send = lttcomm_send_unix_sock(
1352 sock, path, reply.path_length);
1353 if (ret_send != reply.path_length) {
1354 goto error_fatal;
1355 }
1356 goto end_nosignal;
1357 }
1358 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1359 {
1360 const uint64_t relayd_id =
1361 msg.u.trace_chunk_exists.relayd_id.value;
1362
1363 ret_code = lttng_consumer_trace_chunk_exists(
1364 msg.u.trace_chunk_exists.relayd_id.is_set ?
1365 &relayd_id : NULL,
1366 msg.u.trace_chunk_exists.session_id,
1367 msg.u.trace_chunk_exists.chunk_id);
1368 goto end_msg_sessiond;
1369 }
1370 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1371 {
1372 const uint64_t key = msg.u.open_channel_packets.key;
1373 struct lttng_consumer_channel *channel =
1374 consumer_find_channel(key);
1375
1376 if (channel) {
1377 pthread_mutex_lock(&channel->lock);
1378 ret_code = lttng_consumer_open_channel_packets(channel);
1379 pthread_mutex_unlock(&channel->lock);
1380 } else {
1381 WARN("Channel %" PRIu64 " not found", key);
1382 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1383 }
1384
1385 health_code_update();
1386 goto end_msg_sessiond;
1387 }
1388 default:
1389 goto end_nosignal;
1390 }
1391
1392 end_nosignal:
1393 /*
1394 * Return 1 to indicate success since the 0 value can be a socket
1395 * shutdown during the recv() or send() call.
1396 */
1397 ret_func = 1;
1398 goto end;
1399 error_fatal:
1400 /* This will issue a consumer stop. */
1401 ret_func = -1;
1402 goto end;
1403 end_msg_sessiond:
1404 /*
1405 * The returned value here is not useful since either way we'll return 1 to
1406 * the caller because the session daemon socket management is done
1407 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1408 */
1409 {
1410 int ret_send_status;
1411
1412 ret_send_status = consumer_send_status_msg(sock, ret_code);
1413 if (ret_send_status < 0) {
1414 goto error_fatal;
1415 }
1416 }
1417
1418 ret_func = 1;
1419
1420 end:
1421 health_code_update();
1422 rcu_read_unlock();
1423 return ret_func;
1424 }
1425
1426 /*
1427 * Sync metadata meaning request them to the session daemon and snapshot to the
1428 * metadata thread can consumer them.
1429 *
1430 * Metadata stream lock MUST be acquired.
1431 */
1432 enum sync_metadata_status lttng_kconsumer_sync_metadata(
1433 struct lttng_consumer_stream *metadata)
1434 {
1435 int ret;
1436 enum sync_metadata_status status;
1437
1438 LTTNG_ASSERT(metadata);
1439
1440 ret = kernctl_buffer_flush(metadata->wait_fd);
1441 if (ret < 0) {
1442 ERR("Failed to flush kernel stream");
1443 status = SYNC_METADATA_STATUS_ERROR;
1444 goto end;
1445 }
1446
1447 ret = kernctl_snapshot(metadata->wait_fd);
1448 if (ret < 0) {
1449 if (errno == EAGAIN) {
1450 /* No new metadata, exit. */
1451 DBG("Sync metadata, no new kernel metadata");
1452 status = SYNC_METADATA_STATUS_NO_DATA;
1453 } else {
1454 ERR("Sync metadata, taking kernel snapshot failed.");
1455 status = SYNC_METADATA_STATUS_ERROR;
1456 }
1457 } else {
1458 status = SYNC_METADATA_STATUS_NEW_DATA;
1459 }
1460
1461 end:
1462 return status;
1463 }
1464
1465 static
1466 int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1467 struct stream_subbuffer *subbuf)
1468 {
1469 int ret;
1470
1471 ret = kernctl_get_subbuf_size(
1472 stream->wait_fd, &subbuf->info.data.subbuf_size);
1473 if (ret) {
1474 goto end;
1475 }
1476
1477 ret = kernctl_get_padded_subbuf_size(
1478 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1479 if (ret) {
1480 goto end;
1481 }
1482
1483 end:
1484 return ret;
1485 }
1486
1487 static
1488 int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1489 struct stream_subbuffer *subbuf)
1490 {
1491 int ret;
1492
1493 ret = extract_common_subbuffer_info(stream, subbuf);
1494 if (ret) {
1495 goto end;
1496 }
1497
1498 ret = kernctl_get_metadata_version(
1499 stream->wait_fd, &subbuf->info.metadata.version);
1500 if (ret) {
1501 goto end;
1502 }
1503
1504 end:
1505 return ret;
1506 }
1507
1508 static
1509 int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1510 struct stream_subbuffer *subbuf)
1511 {
1512 int ret;
1513
1514 ret = extract_common_subbuffer_info(stream, subbuf);
1515 if (ret) {
1516 goto end;
1517 }
1518
1519 ret = kernctl_get_packet_size(
1520 stream->wait_fd, &subbuf->info.data.packet_size);
1521 if (ret < 0) {
1522 PERROR("Failed to get sub-buffer packet size");
1523 goto end;
1524 }
1525
1526 ret = kernctl_get_content_size(
1527 stream->wait_fd, &subbuf->info.data.content_size);
1528 if (ret < 0) {
1529 PERROR("Failed to get sub-buffer content size");
1530 goto end;
1531 }
1532
1533 ret = kernctl_get_timestamp_begin(
1534 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1535 if (ret < 0) {
1536 PERROR("Failed to get sub-buffer begin timestamp");
1537 goto end;
1538 }
1539
1540 ret = kernctl_get_timestamp_end(
1541 stream->wait_fd, &subbuf->info.data.timestamp_end);
1542 if (ret < 0) {
1543 PERROR("Failed to get sub-buffer end timestamp");
1544 goto end;
1545 }
1546
1547 ret = kernctl_get_events_discarded(
1548 stream->wait_fd, &subbuf->info.data.events_discarded);
1549 if (ret) {
1550 PERROR("Failed to get sub-buffer events discarded count");
1551 goto end;
1552 }
1553
1554 ret = kernctl_get_sequence_number(stream->wait_fd,
1555 &subbuf->info.data.sequence_number.value);
1556 if (ret) {
1557 /* May not be supported by older LTTng-modules. */
1558 if (ret != -ENOTTY) {
1559 PERROR("Failed to get sub-buffer sequence number");
1560 goto end;
1561 }
1562 } else {
1563 subbuf->info.data.sequence_number.is_set = true;
1564 }
1565
1566 ret = kernctl_get_stream_id(
1567 stream->wait_fd, &subbuf->info.data.stream_id);
1568 if (ret < 0) {
1569 PERROR("Failed to get stream id");
1570 goto end;
1571 }
1572
1573 ret = kernctl_get_instance_id(stream->wait_fd,
1574 &subbuf->info.data.stream_instance_id.value);
1575 if (ret) {
1576 /* May not be supported by older LTTng-modules. */
1577 if (ret != -ENOTTY) {
1578 PERROR("Failed to get stream instance id");
1579 goto end;
1580 }
1581 } else {
1582 subbuf->info.data.stream_instance_id.is_set = true;
1583 }
1584 end:
1585 return ret;
1586 }
1587
1588 static
1589 enum get_next_subbuffer_status get_subbuffer_common(
1590 struct lttng_consumer_stream *stream,
1591 struct stream_subbuffer *subbuffer)
1592 {
1593 int ret;
1594 enum get_next_subbuffer_status status;
1595
1596 ret = kernctl_get_next_subbuf(stream->wait_fd);
1597 switch (ret) {
1598 case 0:
1599 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1600 break;
1601 case -ENODATA:
1602 case -EAGAIN:
1603 /*
1604 * The caller only expects -ENODATA when there is no data to
1605 * read, but the kernel tracer returns -EAGAIN when there is
1606 * currently no data for a non-finalized stream, and -ENODATA
1607 * when there is no data for a finalized stream. Those can be
1608 * combined into a -ENODATA return value.
1609 */
1610 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1611 goto end;
1612 default:
1613 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1614 goto end;
1615 }
1616
1617 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1618 stream, subbuffer);
1619 if (ret) {
1620 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1621 }
1622 end:
1623 return status;
1624 }
1625
1626 static
1627 enum get_next_subbuffer_status get_next_subbuffer_splice(
1628 struct lttng_consumer_stream *stream,
1629 struct stream_subbuffer *subbuffer)
1630 {
1631 const enum get_next_subbuffer_status status =
1632 get_subbuffer_common(stream, subbuffer);
1633
1634 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1635 goto end;
1636 }
1637
1638 subbuffer->buffer.fd = stream->wait_fd;
1639 end:
1640 return status;
1641 }
1642
1643 static
1644 enum get_next_subbuffer_status get_next_subbuffer_mmap(
1645 struct lttng_consumer_stream *stream,
1646 struct stream_subbuffer *subbuffer)
1647 {
1648 int ret;
1649 enum get_next_subbuffer_status status;
1650 const char *addr;
1651
1652 status = get_subbuffer_common(stream, subbuffer);
1653 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1654 goto end;
1655 }
1656
1657 ret = get_current_subbuf_addr(stream, &addr);
1658 if (ret) {
1659 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1660 goto end;
1661 }
1662
1663 subbuffer->buffer.buffer = lttng_buffer_view_init(
1664 addr, 0, subbuffer->info.data.padded_subbuf_size);
1665 end:
1666 return status;
1667 }
1668
1669 static
1670 enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1671 struct stream_subbuffer *subbuffer)
1672 {
1673 int ret;
1674 const char *addr;
1675 bool coherent;
1676 enum get_next_subbuffer_status status;
1677
1678 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1679 &coherent);
1680 if (ret) {
1681 goto end;
1682 }
1683
1684 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1685 stream, subbuffer);
1686 if (ret) {
1687 goto end;
1688 }
1689
1690 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1691
1692 ret = get_current_subbuf_addr(stream, &addr);
1693 if (ret) {
1694 goto end;
1695 }
1696
1697 subbuffer->buffer.buffer = lttng_buffer_view_init(
1698 addr, 0, subbuffer->info.data.padded_subbuf_size);
1699 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1700 subbuffer->info.metadata.padded_subbuf_size,
1701 coherent ? "true" : "false");
1702 end:
1703 /*
1704 * The caller only expects -ENODATA when there is no data to read, but
1705 * the kernel tracer returns -EAGAIN when there is currently no data
1706 * for a non-finalized stream, and -ENODATA when there is no data for a
1707 * finalized stream. Those can be combined into a -ENODATA return value.
1708 */
1709 switch (ret) {
1710 case 0:
1711 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1712 break;
1713 case -ENODATA:
1714 case -EAGAIN:
1715 /*
1716 * The caller only expects -ENODATA when there is no data to
1717 * read, but the kernel tracer returns -EAGAIN when there is
1718 * currently no data for a non-finalized stream, and -ENODATA
1719 * when there is no data for a finalized stream. Those can be
1720 * combined into a -ENODATA return value.
1721 */
1722 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1723 break;
1724 default:
1725 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1726 break;
1727 }
1728
1729 return status;
1730 }
1731
1732 static
1733 int put_next_subbuffer(struct lttng_consumer_stream *stream,
1734 struct stream_subbuffer *subbuffer __attribute__((unused)))
1735 {
1736 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1737
1738 if (ret) {
1739 if (ret == -EFAULT) {
1740 PERROR("Error in unreserving sub buffer");
1741 } else if (ret == -EIO) {
1742 /* Should never happen with newer LTTng versions */
1743 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1744 }
1745 }
1746
1747 return ret;
1748 }
1749
1750 static
1751 bool is_get_next_check_metadata_available(int tracer_fd)
1752 {
1753 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1754 const bool available = ret != -ENOTTY;
1755
1756 if (ret == 0) {
1757 /* get succeeded, make sure to put the subbuffer. */
1758 kernctl_put_subbuf(tracer_fd);
1759 }
1760
1761 return available;
1762 }
1763
1764 static
1765 int signal_metadata(struct lttng_consumer_stream *stream,
1766 struct lttng_consumer_local_data *ctx __attribute__((unused)))
1767 {
1768 ASSERT_LOCKED(stream->metadata_rdv_lock);
1769 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1770 }
1771
1772 static
1773 int lttng_kconsumer_set_stream_ops(
1774 struct lttng_consumer_stream *stream)
1775 {
1776 int ret = 0;
1777
1778 if (stream->metadata_flag && stream->chan->is_live) {
1779 DBG("Attempting to enable metadata bucketization for live consumers");
1780 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1781 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1782 stream->read_subbuffer_ops.get_next_subbuffer =
1783 get_next_subbuffer_metadata_check;
1784 ret = consumer_stream_enable_metadata_bucketization(
1785 stream);
1786 if (ret) {
1787 goto end;
1788 }
1789 } else {
1790 /*
1791 * The kernel tracer version is too old to indicate
1792 * when the metadata stream has reached a "coherent"
1793 * (parseable) point.
1794 *
1795 * This means that a live viewer may see an incoherent
1796 * sequence of metadata and fail to parse it.
1797 */
1798 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1799 metadata_bucket_destroy(stream->metadata_bucket);
1800 stream->metadata_bucket = NULL;
1801 }
1802
1803 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1804 }
1805
1806 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1807 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1808 stream->read_subbuffer_ops.get_next_subbuffer =
1809 get_next_subbuffer_mmap;
1810 } else {
1811 stream->read_subbuffer_ops.get_next_subbuffer =
1812 get_next_subbuffer_splice;
1813 }
1814 }
1815
1816 if (stream->metadata_flag) {
1817 stream->read_subbuffer_ops.extract_subbuffer_info =
1818 extract_metadata_subbuffer_info;
1819 } else {
1820 stream->read_subbuffer_ops.extract_subbuffer_info =
1821 extract_data_subbuffer_info;
1822 if (stream->chan->is_live) {
1823 stream->read_subbuffer_ops.send_live_beacon =
1824 consumer_flush_kernel_index;
1825 }
1826 }
1827
1828 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1829 end:
1830 return ret;
1831 }
1832
1833 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1834 {
1835 int ret;
1836
1837 LTTNG_ASSERT(stream);
1838
1839 /*
1840 * Don't create anything if this is set for streaming or if there is
1841 * no current trace chunk on the parent channel.
1842 */
1843 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1844 stream->chan->trace_chunk) {
1845 ret = consumer_stream_create_output_files(stream, true);
1846 if (ret) {
1847 goto error;
1848 }
1849 }
1850
1851 if (stream->output == LTTNG_EVENT_MMAP) {
1852 /* get the len of the mmap region */
1853 unsigned long mmap_len;
1854
1855 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1856 if (ret != 0) {
1857 PERROR("kernctl_get_mmap_len");
1858 goto error_close_fd;
1859 }
1860 stream->mmap_len = (size_t) mmap_len;
1861
1862 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1863 MAP_PRIVATE, stream->wait_fd, 0);
1864 if (stream->mmap_base == MAP_FAILED) {
1865 PERROR("Error mmaping");
1866 ret = -1;
1867 goto error_close_fd;
1868 }
1869 }
1870
1871 ret = lttng_kconsumer_set_stream_ops(stream);
1872 if (ret) {
1873 goto error_close_fd;
1874 }
1875
1876 /* we return 0 to let the library handle the FD internally */
1877 return 0;
1878
1879 error_close_fd:
1880 if (stream->out_fd >= 0) {
1881 int err;
1882
1883 err = close(stream->out_fd);
1884 LTTNG_ASSERT(!err);
1885 stream->out_fd = -1;
1886 }
1887 error:
1888 return ret;
1889 }
1890
1891 /*
1892 * Check if data is still being extracted from the buffers for a specific
1893 * stream. Consumer data lock MUST be acquired before calling this function
1894 * and the stream lock.
1895 *
1896 * Return 1 if the traced data are still getting read else 0 meaning that the
1897 * data is available for trace viewer reading.
1898 */
1899 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1900 {
1901 int ret;
1902
1903 LTTNG_ASSERT(stream);
1904
1905 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1906 ret = 0;
1907 goto end;
1908 }
1909
1910 ret = kernctl_get_next_subbuf(stream->wait_fd);
1911 if (ret == 0) {
1912 /* There is still data so let's put back this subbuffer. */
1913 ret = kernctl_put_subbuf(stream->wait_fd);
1914 LTTNG_ASSERT(ret == 0);
1915 ret = 1; /* Data is pending */
1916 goto end;
1917 }
1918
1919 /* Data is NOT pending and ready to be read. */
1920 ret = 0;
1921
1922 end:
1923 return ret;
1924 }
This page took 0.109461 seconds and 4 git commands to generate.