Clean-up: consumerd: reduce duplication of stream output close code
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <poll.h>
12 #include <pthread.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <sys/mman.h>
16 #include <sys/socket.h>
17 #include <sys/types.h>
18 #include <inttypes.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21 #include <stdint.h>
22
23 #include <bin/lttng-consumerd/health-consumerd.hpp>
24 #include <common/common.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/sessiond-comm/sessiond-comm.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/compat/fcntl.hpp>
29 #include <common/compat/endian.hpp>
30 #include <common/pipe.hpp>
31 #include <common/relayd/relayd.hpp>
32 #include <common/utils.hpp>
33 #include <common/consumer/consumer-stream.hpp>
34 #include <common/index/index.hpp>
35 #include <common/consumer/consumer-timer.hpp>
36 #include <common/optional.hpp>
37 #include <common/buffer-view.hpp>
38 #include <common/consumer/consumer.hpp>
39 #include <common/consumer/metadata-bucket.hpp>
40
41 #include "kernel-consumer.hpp"
42
43 extern struct lttng_consumer_global_data the_consumer_data;
44 extern int consumer_poll_timeout;
45
46 /*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
51 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
62 PERROR("Getting sub-buffer snapshot.");
63 }
64
65 return ret;
66 }
67
68 /*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73 int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75 {
76 LTTNG_ASSERT(stream);
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79 }
80
81 /*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
86 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
87 unsigned long *pos)
88 {
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
94 PERROR("kernctl_snapshot_get_produced");
95 }
96
97 return ret;
98 }
99
100 /*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107 {
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
113 PERROR("kernctl_snapshot_get_consumed");
114 }
115
116 return ret;
117 }
118
119 static
120 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122 {
123 int ret;
124 unsigned long mmap_offset;
125 const char *mmap_base = (const char *) stream->mmap_base;
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134 error:
135 return ret;
136 }
137
138 /*
139 * Take a snapshot of all the stream of a channel
140 * RCU read-side lock must be held across this function to ensure existence of
141 * channel.
142 *
143 * Returns 0 on success, < 0 on error
144 */
145 static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream)
149 {
150 int ret;
151 struct lttng_consumer_stream *stream;
152
153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
154
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
158 rcu_read_lock();
159
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
164 ret = -1;
165 goto end;
166 }
167
168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
169 unsigned long consumed_pos, produced_pos;
170
171 health_code_update();
172
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
178 LTTNG_ASSERT(channel->trace_chunk);
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
188 LTTNG_ASSERT(!stream->trace_chunk);
189 stream->trace_chunk = channel->trace_chunk;
190
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
198 ret = consumer_send_relayd_stream(stream, path);
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto error_close_stream_output;
202 }
203 } else {
204 ret = consumer_stream_create_output_files(stream,
205 false);
206 if (ret < 0) {
207 goto error_close_stream_output;
208 }
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
211 }
212
213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
214 if (ret < 0) {
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto error_close_stream_output;
226 }
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto error_close_stream_output;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto error_close_stream_output;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto error_close_stream_output;
246 }
247
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
251
252 while ((long) (consumed_pos - produced_pos) < 0) {
253 ssize_t read_len;
254 unsigned long len, padded_len;
255 const char *subbuf_addr;
256 struct lttng_buffer_view subbuf_view;
257
258 health_code_update();
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
263 if (ret != -EAGAIN) {
264 PERROR("kernctl_get_subbuf snapshot");
265 goto error_close_stream_output;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
269 stream->chan->lost_packets++;
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
276 goto error_put_subbuf;
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
282 goto error_put_subbuf;
283 }
284
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
292 read_len = lttng_consumer_on_read_subbuffer_mmap(
293 stream, &subbuf_view,
294 padded_len - len);
295 /*
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto error_close_stream_output;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 consumer_stream_close_output(stream);
321 pthread_mutex_unlock(&stream->lock);
322 }
323
324 /* All good! */
325 ret = 0;
326 goto end;
327
328 error_put_subbuf:
329 ret = kernctl_put_subbuf(stream->wait_fd);
330 if (ret < 0) {
331 ERR("Snapshot kernctl_put_subbuf error path");
332 }
333 error_close_stream_output:
334 consumer_stream_close_output(stream);
335 end_unlock:
336 pthread_mutex_unlock(&stream->lock);
337 end:
338 rcu_read_unlock();
339 pthread_mutex_unlock(&channel->lock);
340 return ret;
341 }
342
343 /*
344 * Read the whole metadata available for a snapshot.
345 * RCU read-side lock must be held across this function to ensure existence of
346 * metadata_channel.
347 *
348 * Returns 0 on success, < 0 on error
349 */
350 static int lttng_kconsumer_snapshot_metadata(
351 struct lttng_consumer_channel *metadata_channel,
352 uint64_t key, char *path, uint64_t relayd_id,
353 struct lttng_consumer_local_data *ctx)
354 {
355 int ret, use_relayd = 0;
356 ssize_t ret_read;
357 struct lttng_consumer_stream *metadata_stream;
358
359 LTTNG_ASSERT(ctx);
360
361 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
362 key, path);
363
364 rcu_read_lock();
365
366 metadata_stream = metadata_channel->metadata_stream;
367 LTTNG_ASSERT(metadata_stream);
368
369 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
370 LTTNG_ASSERT(metadata_channel->trace_chunk);
371 LTTNG_ASSERT(metadata_stream->trace_chunk);
372
373 /* Flag once that we have a valid relayd for the stream. */
374 if (relayd_id != (uint64_t) -1ULL) {
375 use_relayd = 1;
376 }
377
378 if (use_relayd) {
379 ret = consumer_send_relayd_stream(metadata_stream, path);
380 if (ret < 0) {
381 goto error_snapshot;
382 }
383 } else {
384 ret = consumer_stream_create_output_files(metadata_stream,
385 false);
386 if (ret < 0) {
387 goto error_snapshot;
388 }
389 }
390
391 do {
392 health_code_update();
393
394 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
395 if (ret_read < 0) {
396 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
397 ret_read);
398 ret = ret_read;
399 goto error_snapshot;
400 }
401 } while (ret_read > 0);
402
403 if (use_relayd) {
404 close_relayd_stream(metadata_stream);
405 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
406 } else {
407 if (metadata_stream->out_fd >= 0) {
408 ret = close(metadata_stream->out_fd);
409 if (ret < 0) {
410 PERROR("Kernel consumer snapshot metadata close out_fd");
411 /*
412 * Don't go on error here since the snapshot was successful at this
413 * point but somehow the close failed.
414 */
415 }
416 metadata_stream->out_fd = -1;
417 lttng_trace_chunk_put(metadata_stream->trace_chunk);
418 metadata_stream->trace_chunk = NULL;
419 }
420 }
421
422 ret = 0;
423 error_snapshot:
424 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
425 consumer_stream_destroy(metadata_stream, NULL);
426 metadata_channel->metadata_stream = NULL;
427 rcu_read_unlock();
428 return ret;
429 }
430
431 /*
432 * Receive command from session daemon and process it.
433 *
434 * Return 1 on success else a negative value or 0.
435 */
436 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
437 int sock, struct pollfd *consumer_sockpoll)
438 {
439 int ret_func;
440 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
441 struct lttcomm_consumer_msg msg;
442
443 health_code_update();
444
445 {
446 ssize_t ret_recv;
447
448 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
449 if (ret_recv != sizeof(msg)) {
450 if (ret_recv > 0) {
451 lttng_consumer_send_error(ctx,
452 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
453 ret_recv = -1;
454 }
455 return ret_recv;
456 }
457 }
458
459 health_code_update();
460
461 /* Deprecated command */
462 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
463
464 health_code_update();
465
466 /* relayd needs RCU read-side protection */
467 rcu_read_lock();
468
469 switch (msg.cmd_type) {
470 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
471 {
472 uint32_t major = msg.u.relayd_sock.major;
473 uint32_t minor = msg.u.relayd_sock.minor;
474 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
475 msg.u.relayd_sock.relayd_socket_protocol;
476
477 /* Session daemon status message are handled in the following call. */
478 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
479 msg.u.relayd_sock.type, ctx, sock,
480 consumer_sockpoll, msg.u.relayd_sock.session_id,
481 msg.u.relayd_sock.relayd_session_id, major,
482 minor, protocol);
483 goto end_nosignal;
484 }
485 case LTTNG_CONSUMER_ADD_CHANNEL:
486 {
487 struct lttng_consumer_channel *new_channel;
488 int ret_send_status, ret_add_channel = 0;
489 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
490
491 health_code_update();
492
493 /* First send a status message before receiving the fds. */
494 ret_send_status = consumer_send_status_msg(sock, ret_code);
495 if (ret_send_status < 0) {
496 /* Somehow, the session daemon is not responding anymore. */
497 goto error_fatal;
498 }
499
500 health_code_update();
501
502 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
503 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
504 msg.u.channel.session_id,
505 msg.u.channel.chunk_id.is_set ?
506 &chunk_id : NULL,
507 msg.u.channel.pathname,
508 msg.u.channel.name,
509 msg.u.channel.relayd_id, msg.u.channel.output,
510 msg.u.channel.tracefile_size,
511 msg.u.channel.tracefile_count, 0,
512 msg.u.channel.monitor,
513 msg.u.channel.live_timer_interval,
514 msg.u.channel.is_live,
515 NULL, NULL);
516 if (new_channel == NULL) {
517 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
518 goto end_nosignal;
519 }
520 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
521 switch (msg.u.channel.output) {
522 case LTTNG_EVENT_SPLICE:
523 new_channel->output = CONSUMER_CHANNEL_SPLICE;
524 break;
525 case LTTNG_EVENT_MMAP:
526 new_channel->output = CONSUMER_CHANNEL_MMAP;
527 break;
528 default:
529 ERR("Channel output unknown %d", msg.u.channel.output);
530 goto end_nosignal;
531 }
532
533 /* Translate and save channel type. */
534 switch (msg.u.channel.type) {
535 case CONSUMER_CHANNEL_TYPE_DATA:
536 case CONSUMER_CHANNEL_TYPE_METADATA:
537 new_channel->type = (consumer_channel_type) msg.u.channel.type;
538 break;
539 default:
540 abort();
541 goto end_nosignal;
542 };
543
544 health_code_update();
545
546 if (ctx->on_recv_channel != NULL) {
547 int ret_recv_channel =
548 ctx->on_recv_channel(new_channel);
549 if (ret_recv_channel == 0) {
550 ret_add_channel = consumer_add_channel(
551 new_channel, ctx);
552 } else if (ret_recv_channel < 0) {
553 goto end_nosignal;
554 }
555 } else {
556 ret_add_channel =
557 consumer_add_channel(new_channel, ctx);
558 }
559 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
560 !ret_add_channel) {
561 int monitor_start_ret;
562
563 DBG("Consumer starting monitor timer");
564 consumer_timer_live_start(new_channel,
565 msg.u.channel.live_timer_interval);
566 monitor_start_ret = consumer_timer_monitor_start(
567 new_channel,
568 msg.u.channel.monitor_timer_interval);
569 if (monitor_start_ret < 0) {
570 ERR("Starting channel monitoring timer failed");
571 goto end_nosignal;
572 }
573 }
574
575 health_code_update();
576
577 /* If we received an error in add_channel, we need to report it. */
578 if (ret_add_channel < 0) {
579 ret_send_status = consumer_send_status_msg(
580 sock, ret_add_channel);
581 if (ret_send_status < 0) {
582 goto error_fatal;
583 }
584 goto end_nosignal;
585 }
586
587 goto end_nosignal;
588 }
589 case LTTNG_CONSUMER_ADD_STREAM:
590 {
591 int fd;
592 struct lttng_pipe *stream_pipe;
593 struct lttng_consumer_stream *new_stream;
594 struct lttng_consumer_channel *channel;
595 int alloc_ret = 0;
596 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
597 ssize_t ret_pipe_write, ret_recv;
598
599 /*
600 * Get stream's channel reference. Needed when adding the stream to the
601 * global hash table.
602 */
603 channel = consumer_find_channel(msg.u.stream.channel_key);
604 if (!channel) {
605 /*
606 * We could not find the channel. Can happen if cpu hotplug
607 * happens while tearing down.
608 */
609 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
610 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
611 }
612
613 health_code_update();
614
615 /* First send a status message before receiving the fds. */
616 ret_send_status = consumer_send_status_msg(sock, ret_code);
617 if (ret_send_status < 0) {
618 /* Somehow, the session daemon is not responding anymore. */
619 goto error_add_stream_fatal;
620 }
621
622 health_code_update();
623
624 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
625 /* Channel was not found. */
626 goto error_add_stream_nosignal;
627 }
628
629 /* Blocking call */
630 health_poll_entry();
631 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
632 health_poll_exit();
633 if (ret_poll) {
634 goto error_add_stream_fatal;
635 }
636
637 health_code_update();
638
639 /* Get stream file descriptor from socket */
640 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
641 if (ret_recv != sizeof(fd)) {
642 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
643 ret_func = ret_recv;
644 goto end;
645 }
646
647 health_code_update();
648
649 /*
650 * Send status code to session daemon only if the recv works. If the
651 * above recv() failed, the session daemon is notified through the
652 * error socket and the teardown is eventually done.
653 */
654 ret_send_status = consumer_send_status_msg(sock, ret_code);
655 if (ret_send_status < 0) {
656 /* Somehow, the session daemon is not responding anymore. */
657 goto error_add_stream_nosignal;
658 }
659
660 health_code_update();
661
662 pthread_mutex_lock(&channel->lock);
663 new_stream = consumer_stream_create(
664 channel,
665 channel->key,
666 fd,
667 channel->name,
668 channel->relayd_id,
669 channel->session_id,
670 channel->trace_chunk,
671 msg.u.stream.cpu,
672 &alloc_ret,
673 channel->type,
674 channel->monitor);
675 if (new_stream == NULL) {
676 switch (alloc_ret) {
677 case -ENOMEM:
678 case -EINVAL:
679 default:
680 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
681 break;
682 }
683 pthread_mutex_unlock(&channel->lock);
684 goto error_add_stream_nosignal;
685 }
686
687 new_stream->wait_fd = fd;
688 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
689 new_stream->wait_fd, &new_stream->max_sb_size);
690 if (ret_get_max_subbuf_size < 0) {
691 pthread_mutex_unlock(&channel->lock);
692 ERR("Failed to get kernel maximal subbuffer size");
693 goto error_add_stream_nosignal;
694 }
695
696 consumer_stream_update_channel_attributes(new_stream,
697 channel);
698
699 /*
700 * We've just assigned the channel to the stream so increment the
701 * refcount right now. We don't need to increment the refcount for
702 * streams in no monitor because we handle manually the cleanup of
703 * those. It is very important to make sure there is NO prior
704 * consumer_del_stream() calls or else the refcount will be unbalanced.
705 */
706 if (channel->monitor) {
707 uatomic_inc(&new_stream->chan->refcount);
708 }
709
710 /*
711 * The buffer flush is done on the session daemon side for the kernel
712 * so no need for the stream "hangup_flush_done" variable to be
713 * tracked. This is important for a kernel stream since we don't rely
714 * on the flush state of the stream to read data. It's not the case for
715 * user space tracing.
716 */
717 new_stream->hangup_flush_done = 0;
718
719 health_code_update();
720
721 pthread_mutex_lock(&new_stream->lock);
722 if (ctx->on_recv_stream) {
723 int ret_recv_stream = ctx->on_recv_stream(new_stream);
724 if (ret_recv_stream < 0) {
725 pthread_mutex_unlock(&new_stream->lock);
726 pthread_mutex_unlock(&channel->lock);
727 consumer_stream_free(new_stream);
728 goto error_add_stream_nosignal;
729 }
730 }
731 health_code_update();
732
733 if (new_stream->metadata_flag) {
734 channel->metadata_stream = new_stream;
735 }
736
737 /* Do not monitor this stream. */
738 if (!channel->monitor) {
739 DBG("Kernel consumer add stream %s in no monitor mode with "
740 "relayd id %" PRIu64, new_stream->name,
741 new_stream->net_seq_idx);
742 cds_list_add(&new_stream->send_node, &channel->streams.head);
743 pthread_mutex_unlock(&new_stream->lock);
744 pthread_mutex_unlock(&channel->lock);
745 goto end_add_stream;
746 }
747
748 /* Send stream to relayd if the stream has an ID. */
749 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
750 int ret_send_relayd_stream;
751
752 ret_send_relayd_stream = consumer_send_relayd_stream(
753 new_stream, new_stream->chan->pathname);
754 if (ret_send_relayd_stream < 0) {
755 pthread_mutex_unlock(&new_stream->lock);
756 pthread_mutex_unlock(&channel->lock);
757 consumer_stream_free(new_stream);
758 goto error_add_stream_nosignal;
759 }
760
761 /*
762 * If adding an extra stream to an already
763 * existing channel (e.g. cpu hotplug), we need
764 * to send the "streams_sent" command to relayd.
765 */
766 if (channel->streams_sent_to_relayd) {
767 int ret_send_relayd_streams_sent;
768
769 ret_send_relayd_streams_sent =
770 consumer_send_relayd_streams_sent(
771 new_stream->net_seq_idx);
772 if (ret_send_relayd_streams_sent < 0) {
773 pthread_mutex_unlock(&new_stream->lock);
774 pthread_mutex_unlock(&channel->lock);
775 goto error_add_stream_nosignal;
776 }
777 }
778 }
779 pthread_mutex_unlock(&new_stream->lock);
780 pthread_mutex_unlock(&channel->lock);
781
782 /* Get the right pipe where the stream will be sent. */
783 if (new_stream->metadata_flag) {
784 consumer_add_metadata_stream(new_stream);
785 stream_pipe = ctx->consumer_metadata_pipe;
786 } else {
787 consumer_add_data_stream(new_stream);
788 stream_pipe = ctx->consumer_data_pipe;
789 }
790
791 /* Visible to other threads */
792 new_stream->globally_visible = 1;
793
794 health_code_update();
795
796 ret_pipe_write = lttng_pipe_write(
797 stream_pipe, &new_stream, sizeof(new_stream));
798 if (ret_pipe_write < 0) {
799 ERR("Consumer write %s stream to pipe %d",
800 new_stream->metadata_flag ? "metadata" : "data",
801 lttng_pipe_get_writefd(stream_pipe));
802 if (new_stream->metadata_flag) {
803 consumer_del_stream_for_metadata(new_stream);
804 } else {
805 consumer_del_stream_for_data(new_stream);
806 }
807 goto error_add_stream_nosignal;
808 }
809
810 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
811 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
812 end_add_stream:
813 break;
814 error_add_stream_nosignal:
815 goto end_nosignal;
816 error_add_stream_fatal:
817 goto error_fatal;
818 }
819 case LTTNG_CONSUMER_STREAMS_SENT:
820 {
821 struct lttng_consumer_channel *channel;
822 int ret_send_status;
823
824 /*
825 * Get stream's channel reference. Needed when adding the stream to the
826 * global hash table.
827 */
828 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
829 if (!channel) {
830 /*
831 * We could not find the channel. Can happen if cpu hotplug
832 * happens while tearing down.
833 */
834 ERR("Unable to find channel key %" PRIu64,
835 msg.u.sent_streams.channel_key);
836 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
837 }
838
839 health_code_update();
840
841 /*
842 * Send status code to session daemon.
843 */
844 ret_send_status = consumer_send_status_msg(sock, ret_code);
845 if (ret_send_status < 0 ||
846 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
847 /* Somehow, the session daemon is not responding anymore. */
848 goto error_streams_sent_nosignal;
849 }
850
851 health_code_update();
852
853 /*
854 * We should not send this message if we don't monitor the
855 * streams in this channel.
856 */
857 if (!channel->monitor) {
858 goto end_error_streams_sent;
859 }
860
861 health_code_update();
862 /* Send stream to relayd if the stream has an ID. */
863 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
864 int ret_send_relay_streams;
865
866 ret_send_relay_streams = consumer_send_relayd_streams_sent(
867 msg.u.sent_streams.net_seq_idx);
868 if (ret_send_relay_streams < 0) {
869 goto error_streams_sent_nosignal;
870 }
871 channel->streams_sent_to_relayd = true;
872 }
873 end_error_streams_sent:
874 break;
875 error_streams_sent_nosignal:
876 goto end_nosignal;
877 }
878 case LTTNG_CONSUMER_UPDATE_STREAM:
879 {
880 rcu_read_unlock();
881 return -ENOSYS;
882 }
883 case LTTNG_CONSUMER_DESTROY_RELAYD:
884 {
885 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
886 struct consumer_relayd_sock_pair *relayd;
887 int ret_send_status;
888
889 DBG("Kernel consumer destroying relayd %" PRIu64, index);
890
891 /* Get relayd reference if exists. */
892 relayd = consumer_find_relayd(index);
893 if (relayd == NULL) {
894 DBG("Unable to find relayd %" PRIu64, index);
895 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
896 }
897
898 /*
899 * Each relayd socket pair has a refcount of stream attached to it
900 * which tells if the relayd is still active or not depending on the
901 * refcount value.
902 *
903 * This will set the destroy flag of the relayd object and destroy it
904 * if the refcount reaches zero when called.
905 *
906 * The destroy can happen either here or when a stream fd hangs up.
907 */
908 if (relayd) {
909 consumer_flag_relayd_for_destroy(relayd);
910 }
911
912 health_code_update();
913
914 ret_send_status = consumer_send_status_msg(sock, ret_code);
915 if (ret_send_status < 0) {
916 /* Somehow, the session daemon is not responding anymore. */
917 goto error_fatal;
918 }
919
920 goto end_nosignal;
921 }
922 case LTTNG_CONSUMER_DATA_PENDING:
923 {
924 int32_t ret_data_pending;
925 uint64_t id = msg.u.data_pending.session_id;
926 ssize_t ret_send;
927
928 DBG("Kernel consumer data pending command for id %" PRIu64, id);
929
930 ret_data_pending = consumer_data_pending(id);
931
932 health_code_update();
933
934 /* Send back returned value to session daemon */
935 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
936 sizeof(ret_data_pending));
937 if (ret_send < 0) {
938 PERROR("send data pending ret code");
939 goto error_fatal;
940 }
941
942 /*
943 * No need to send back a status message since the data pending
944 * returned value is the response.
945 */
946 break;
947 }
948 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
949 {
950 struct lttng_consumer_channel *channel;
951 uint64_t key = msg.u.snapshot_channel.key;
952 int ret_send_status;
953
954 channel = consumer_find_channel(key);
955 if (!channel) {
956 ERR("Channel %" PRIu64 " not found", key);
957 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
958 } else {
959 if (msg.u.snapshot_channel.metadata == 1) {
960 int ret_snapshot;
961
962 ret_snapshot = lttng_kconsumer_snapshot_metadata(
963 channel, key,
964 msg.u.snapshot_channel.pathname,
965 msg.u.snapshot_channel.relayd_id,
966 ctx);
967 if (ret_snapshot < 0) {
968 ERR("Snapshot metadata failed");
969 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
970 }
971 } else {
972 int ret_snapshot;
973
974 ret_snapshot = lttng_kconsumer_snapshot_channel(
975 channel, key,
976 msg.u.snapshot_channel.pathname,
977 msg.u.snapshot_channel.relayd_id,
978 msg.u.snapshot_channel
979 .nb_packets_per_stream);
980 if (ret_snapshot < 0) {
981 ERR("Snapshot channel failed");
982 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
983 }
984 }
985 }
986 health_code_update();
987
988 ret_send_status = consumer_send_status_msg(sock, ret_code);
989 if (ret_send_status < 0) {
990 /* Somehow, the session daemon is not responding anymore. */
991 goto end_nosignal;
992 }
993 break;
994 }
995 case LTTNG_CONSUMER_DESTROY_CHANNEL:
996 {
997 uint64_t key = msg.u.destroy_channel.key;
998 struct lttng_consumer_channel *channel;
999 int ret_send_status;
1000
1001 channel = consumer_find_channel(key);
1002 if (!channel) {
1003 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1004 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1005 }
1006
1007 health_code_update();
1008
1009 ret_send_status = consumer_send_status_msg(sock, ret_code);
1010 if (ret_send_status < 0) {
1011 /* Somehow, the session daemon is not responding anymore. */
1012 goto end_destroy_channel;
1013 }
1014
1015 health_code_update();
1016
1017 /* Stop right now if no channel was found. */
1018 if (!channel) {
1019 goto end_destroy_channel;
1020 }
1021
1022 /*
1023 * This command should ONLY be issued for channel with streams set in
1024 * no monitor mode.
1025 */
1026 LTTNG_ASSERT(!channel->monitor);
1027
1028 /*
1029 * The refcount should ALWAYS be 0 in the case of a channel in no
1030 * monitor mode.
1031 */
1032 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
1033
1034 consumer_del_channel(channel);
1035 end_destroy_channel:
1036 goto end_nosignal;
1037 }
1038 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1039 {
1040 ssize_t ret;
1041 uint64_t count;
1042 struct lttng_consumer_channel *channel;
1043 uint64_t id = msg.u.discarded_events.session_id;
1044 uint64_t key = msg.u.discarded_events.channel_key;
1045
1046 DBG("Kernel consumer discarded events command for session id %"
1047 PRIu64 ", channel key %" PRIu64, id, key);
1048
1049 channel = consumer_find_channel(key);
1050 if (!channel) {
1051 ERR("Kernel consumer discarded events channel %"
1052 PRIu64 " not found", key);
1053 count = 0;
1054 } else {
1055 count = channel->discarded_events;
1056 }
1057
1058 health_code_update();
1059
1060 /* Send back returned value to session daemon */
1061 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1062 if (ret < 0) {
1063 PERROR("send discarded events");
1064 goto error_fatal;
1065 }
1066
1067 break;
1068 }
1069 case LTTNG_CONSUMER_LOST_PACKETS:
1070 {
1071 ssize_t ret;
1072 uint64_t count;
1073 struct lttng_consumer_channel *channel;
1074 uint64_t id = msg.u.lost_packets.session_id;
1075 uint64_t key = msg.u.lost_packets.channel_key;
1076
1077 DBG("Kernel consumer lost packets command for session id %"
1078 PRIu64 ", channel key %" PRIu64, id, key);
1079
1080 channel = consumer_find_channel(key);
1081 if (!channel) {
1082 ERR("Kernel consumer lost packets channel %"
1083 PRIu64 " not found", key);
1084 count = 0;
1085 } else {
1086 count = channel->lost_packets;
1087 }
1088
1089 health_code_update();
1090
1091 /* Send back returned value to session daemon */
1092 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1093 if (ret < 0) {
1094 PERROR("send lost packets");
1095 goto error_fatal;
1096 }
1097
1098 break;
1099 }
1100 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1101 {
1102 int channel_monitor_pipe;
1103 int ret_send_status, ret_set_channel_monitor_pipe;
1104 ssize_t ret_recv;
1105
1106 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1107 /* Successfully received the command's type. */
1108 ret_send_status = consumer_send_status_msg(sock, ret_code);
1109 if (ret_send_status < 0) {
1110 goto error_fatal;
1111 }
1112
1113 ret_recv = lttcomm_recv_fds_unix_sock(
1114 sock, &channel_monitor_pipe, 1);
1115 if (ret_recv != sizeof(channel_monitor_pipe)) {
1116 ERR("Failed to receive channel monitor pipe");
1117 goto error_fatal;
1118 }
1119
1120 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1121 ret_set_channel_monitor_pipe =
1122 consumer_timer_thread_set_channel_monitor_pipe(
1123 channel_monitor_pipe);
1124 if (!ret_set_channel_monitor_pipe) {
1125 int flags;
1126 int ret_fcntl;
1127
1128 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1129 /* Set the pipe as non-blocking. */
1130 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1131 if (ret_fcntl == -1) {
1132 PERROR("fcntl get flags of the channel monitoring pipe");
1133 goto error_fatal;
1134 }
1135 flags = ret_fcntl;
1136
1137 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
1138 flags | O_NONBLOCK);
1139 if (ret_fcntl == -1) {
1140 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1141 goto error_fatal;
1142 }
1143 DBG("Channel monitor pipe set as non-blocking");
1144 } else {
1145 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1146 }
1147 ret_send_status = consumer_send_status_msg(sock, ret_code);
1148 if (ret_send_status < 0) {
1149 goto error_fatal;
1150 }
1151 break;
1152 }
1153 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1154 {
1155 struct lttng_consumer_channel *channel;
1156 uint64_t key = msg.u.rotate_channel.key;
1157 int ret_send_status;
1158
1159 DBG("Consumer rotate channel %" PRIu64, key);
1160
1161 channel = consumer_find_channel(key);
1162 if (!channel) {
1163 ERR("Channel %" PRIu64 " not found", key);
1164 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1165 } else {
1166 /*
1167 * Sample the rotate position of all the streams in this channel.
1168 */
1169 int ret_rotate_channel;
1170
1171 ret_rotate_channel = lttng_consumer_rotate_channel(
1172 channel, key,
1173 msg.u.rotate_channel.relayd_id);
1174 if (ret_rotate_channel < 0) {
1175 ERR("Rotate channel failed");
1176 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1177 }
1178
1179 health_code_update();
1180 }
1181
1182 ret_send_status = consumer_send_status_msg(sock, ret_code);
1183 if (ret_send_status < 0) {
1184 /* Somehow, the session daemon is not responding anymore. */
1185 goto error_rotate_channel;
1186 }
1187 if (channel) {
1188 /* Rotate the streams that are ready right now. */
1189 int ret_rotate;
1190
1191 ret_rotate = lttng_consumer_rotate_ready_streams(
1192 channel, key);
1193 if (ret_rotate < 0) {
1194 ERR("Rotate ready streams failed");
1195 }
1196 }
1197 break;
1198 error_rotate_channel:
1199 goto end_nosignal;
1200 }
1201 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1202 {
1203 struct lttng_consumer_channel *channel;
1204 uint64_t key = msg.u.clear_channel.key;
1205 int ret_send_status;
1206
1207 channel = consumer_find_channel(key);
1208 if (!channel) {
1209 DBG("Channel %" PRIu64 " not found", key);
1210 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1211 } else {
1212 int ret_clear_channel;
1213
1214 ret_clear_channel =
1215 lttng_consumer_clear_channel(channel);
1216 if (ret_clear_channel) {
1217 ERR("Clear channel failed");
1218 ret_code = (lttcomm_return_code) ret_clear_channel;
1219 }
1220
1221 health_code_update();
1222 }
1223
1224 ret_send_status = consumer_send_status_msg(sock, ret_code);
1225 if (ret_send_status < 0) {
1226 /* Somehow, the session daemon is not responding anymore. */
1227 goto end_nosignal;
1228 }
1229
1230 break;
1231 }
1232 case LTTNG_CONSUMER_INIT:
1233 {
1234 int ret_send_status;
1235 lttng_uuid sessiond_uuid;
1236
1237 std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
1238 sessiond_uuid.begin());
1239
1240 ret_code = lttng_consumer_init_command(ctx,
1241 sessiond_uuid);
1242 health_code_update();
1243 ret_send_status = consumer_send_status_msg(sock, ret_code);
1244 if (ret_send_status < 0) {
1245 /* Somehow, the session daemon is not responding anymore. */
1246 goto end_nosignal;
1247 }
1248 break;
1249 }
1250 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1251 {
1252 const struct lttng_credentials credentials = {
1253 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1254 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
1255 };
1256 const bool is_local_trace =
1257 !msg.u.create_trace_chunk.relayd_id.is_set;
1258 const uint64_t relayd_id =
1259 msg.u.create_trace_chunk.relayd_id.value;
1260 const char *chunk_override_name =
1261 *msg.u.create_trace_chunk.override_name ?
1262 msg.u.create_trace_chunk.override_name :
1263 NULL;
1264 struct lttng_directory_handle *chunk_directory_handle = NULL;
1265
1266 /*
1267 * The session daemon will only provide a chunk directory file
1268 * descriptor for local traces.
1269 */
1270 if (is_local_trace) {
1271 int chunk_dirfd;
1272 int ret_send_status;
1273 ssize_t ret_recv;
1274
1275 /* Acnowledge the reception of the command. */
1276 ret_send_status = consumer_send_status_msg(
1277 sock, LTTCOMM_CONSUMERD_SUCCESS);
1278 if (ret_send_status < 0) {
1279 /* Somehow, the session daemon is not responding anymore. */
1280 goto end_nosignal;
1281 }
1282
1283 ret_recv = lttcomm_recv_fds_unix_sock(
1284 sock, &chunk_dirfd, 1);
1285 if (ret_recv != sizeof(chunk_dirfd)) {
1286 ERR("Failed to receive trace chunk directory file descriptor");
1287 goto error_fatal;
1288 }
1289
1290 DBG("Received trace chunk directory fd (%d)",
1291 chunk_dirfd);
1292 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
1293 chunk_dirfd);
1294 if (!chunk_directory_handle) {
1295 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1296 if (close(chunk_dirfd)) {
1297 PERROR("Failed to close chunk directory file descriptor");
1298 }
1299 goto error_fatal;
1300 }
1301 }
1302
1303 ret_code = lttng_consumer_create_trace_chunk(
1304 !is_local_trace ? &relayd_id : NULL,
1305 msg.u.create_trace_chunk.session_id,
1306 msg.u.create_trace_chunk.chunk_id,
1307 (time_t) msg.u.create_trace_chunk
1308 .creation_timestamp,
1309 chunk_override_name,
1310 msg.u.create_trace_chunk.credentials.is_set ?
1311 &credentials :
1312 NULL,
1313 chunk_directory_handle);
1314 lttng_directory_handle_put(chunk_directory_handle);
1315 goto end_msg_sessiond;
1316 }
1317 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1318 {
1319 enum lttng_trace_chunk_command_type close_command =
1320 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1321 const uint64_t relayd_id =
1322 msg.u.close_trace_chunk.relayd_id.value;
1323 struct lttcomm_consumer_close_trace_chunk_reply reply;
1324 char path[LTTNG_PATH_MAX];
1325 ssize_t ret_send;
1326
1327 ret_code = lttng_consumer_close_trace_chunk(
1328 msg.u.close_trace_chunk.relayd_id.is_set ?
1329 &relayd_id :
1330 NULL,
1331 msg.u.close_trace_chunk.session_id,
1332 msg.u.close_trace_chunk.chunk_id,
1333 (time_t) msg.u.close_trace_chunk.close_timestamp,
1334 msg.u.close_trace_chunk.close_command.is_set ?
1335 &close_command :
1336 NULL, path);
1337 reply.ret_code = ret_code;
1338 reply.path_length = strlen(path) + 1;
1339 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1340 if (ret_send != sizeof(reply)) {
1341 goto error_fatal;
1342 }
1343 ret_send = lttcomm_send_unix_sock(
1344 sock, path, reply.path_length);
1345 if (ret_send != reply.path_length) {
1346 goto error_fatal;
1347 }
1348 goto end_nosignal;
1349 }
1350 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1351 {
1352 const uint64_t relayd_id =
1353 msg.u.trace_chunk_exists.relayd_id.value;
1354
1355 ret_code = lttng_consumer_trace_chunk_exists(
1356 msg.u.trace_chunk_exists.relayd_id.is_set ?
1357 &relayd_id : NULL,
1358 msg.u.trace_chunk_exists.session_id,
1359 msg.u.trace_chunk_exists.chunk_id);
1360 goto end_msg_sessiond;
1361 }
1362 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1363 {
1364 const uint64_t key = msg.u.open_channel_packets.key;
1365 struct lttng_consumer_channel *channel =
1366 consumer_find_channel(key);
1367
1368 if (channel) {
1369 pthread_mutex_lock(&channel->lock);
1370 ret_code = lttng_consumer_open_channel_packets(channel);
1371 pthread_mutex_unlock(&channel->lock);
1372 } else {
1373 WARN("Channel %" PRIu64 " not found", key);
1374 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1375 }
1376
1377 health_code_update();
1378 goto end_msg_sessiond;
1379 }
1380 default:
1381 goto end_nosignal;
1382 }
1383
1384 end_nosignal:
1385 /*
1386 * Return 1 to indicate success since the 0 value can be a socket
1387 * shutdown during the recv() or send() call.
1388 */
1389 ret_func = 1;
1390 goto end;
1391 error_fatal:
1392 /* This will issue a consumer stop. */
1393 ret_func = -1;
1394 goto end;
1395 end_msg_sessiond:
1396 /*
1397 * The returned value here is not useful since either way we'll return 1 to
1398 * the caller because the session daemon socket management is done
1399 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1400 */
1401 {
1402 int ret_send_status;
1403
1404 ret_send_status = consumer_send_status_msg(sock, ret_code);
1405 if (ret_send_status < 0) {
1406 goto error_fatal;
1407 }
1408 }
1409
1410 ret_func = 1;
1411
1412 end:
1413 health_code_update();
1414 rcu_read_unlock();
1415 return ret_func;
1416 }
1417
1418 /*
1419 * Sync metadata meaning request them to the session daemon and snapshot to the
1420 * metadata thread can consumer them.
1421 *
1422 * Metadata stream lock MUST be acquired.
1423 */
1424 enum sync_metadata_status lttng_kconsumer_sync_metadata(
1425 struct lttng_consumer_stream *metadata)
1426 {
1427 int ret;
1428 enum sync_metadata_status status;
1429
1430 LTTNG_ASSERT(metadata);
1431
1432 ret = kernctl_buffer_flush(metadata->wait_fd);
1433 if (ret < 0) {
1434 ERR("Failed to flush kernel stream");
1435 status = SYNC_METADATA_STATUS_ERROR;
1436 goto end;
1437 }
1438
1439 ret = kernctl_snapshot(metadata->wait_fd);
1440 if (ret < 0) {
1441 if (errno == EAGAIN) {
1442 /* No new metadata, exit. */
1443 DBG("Sync metadata, no new kernel metadata");
1444 status = SYNC_METADATA_STATUS_NO_DATA;
1445 } else {
1446 ERR("Sync metadata, taking kernel snapshot failed.");
1447 status = SYNC_METADATA_STATUS_ERROR;
1448 }
1449 } else {
1450 status = SYNC_METADATA_STATUS_NEW_DATA;
1451 }
1452
1453 end:
1454 return status;
1455 }
1456
1457 static
1458 int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1459 struct stream_subbuffer *subbuf)
1460 {
1461 int ret;
1462
1463 ret = kernctl_get_subbuf_size(
1464 stream->wait_fd, &subbuf->info.data.subbuf_size);
1465 if (ret) {
1466 goto end;
1467 }
1468
1469 ret = kernctl_get_padded_subbuf_size(
1470 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1471 if (ret) {
1472 goto end;
1473 }
1474
1475 end:
1476 return ret;
1477 }
1478
1479 static
1480 int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1481 struct stream_subbuffer *subbuf)
1482 {
1483 int ret;
1484
1485 ret = extract_common_subbuffer_info(stream, subbuf);
1486 if (ret) {
1487 goto end;
1488 }
1489
1490 ret = kernctl_get_metadata_version(
1491 stream->wait_fd, &subbuf->info.metadata.version);
1492 if (ret) {
1493 goto end;
1494 }
1495
1496 end:
1497 return ret;
1498 }
1499
1500 static
1501 int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1502 struct stream_subbuffer *subbuf)
1503 {
1504 int ret;
1505
1506 ret = extract_common_subbuffer_info(stream, subbuf);
1507 if (ret) {
1508 goto end;
1509 }
1510
1511 ret = kernctl_get_packet_size(
1512 stream->wait_fd, &subbuf->info.data.packet_size);
1513 if (ret < 0) {
1514 PERROR("Failed to get sub-buffer packet size");
1515 goto end;
1516 }
1517
1518 ret = kernctl_get_content_size(
1519 stream->wait_fd, &subbuf->info.data.content_size);
1520 if (ret < 0) {
1521 PERROR("Failed to get sub-buffer content size");
1522 goto end;
1523 }
1524
1525 ret = kernctl_get_timestamp_begin(
1526 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1527 if (ret < 0) {
1528 PERROR("Failed to get sub-buffer begin timestamp");
1529 goto end;
1530 }
1531
1532 ret = kernctl_get_timestamp_end(
1533 stream->wait_fd, &subbuf->info.data.timestamp_end);
1534 if (ret < 0) {
1535 PERROR("Failed to get sub-buffer end timestamp");
1536 goto end;
1537 }
1538
1539 ret = kernctl_get_events_discarded(
1540 stream->wait_fd, &subbuf->info.data.events_discarded);
1541 if (ret) {
1542 PERROR("Failed to get sub-buffer events discarded count");
1543 goto end;
1544 }
1545
1546 ret = kernctl_get_sequence_number(stream->wait_fd,
1547 &subbuf->info.data.sequence_number.value);
1548 if (ret) {
1549 /* May not be supported by older LTTng-modules. */
1550 if (ret != -ENOTTY) {
1551 PERROR("Failed to get sub-buffer sequence number");
1552 goto end;
1553 }
1554 } else {
1555 subbuf->info.data.sequence_number.is_set = true;
1556 }
1557
1558 ret = kernctl_get_stream_id(
1559 stream->wait_fd, &subbuf->info.data.stream_id);
1560 if (ret < 0) {
1561 PERROR("Failed to get stream id");
1562 goto end;
1563 }
1564
1565 ret = kernctl_get_instance_id(stream->wait_fd,
1566 &subbuf->info.data.stream_instance_id.value);
1567 if (ret) {
1568 /* May not be supported by older LTTng-modules. */
1569 if (ret != -ENOTTY) {
1570 PERROR("Failed to get stream instance id");
1571 goto end;
1572 }
1573 } else {
1574 subbuf->info.data.stream_instance_id.is_set = true;
1575 }
1576 end:
1577 return ret;
1578 }
1579
1580 static
1581 enum get_next_subbuffer_status get_subbuffer_common(
1582 struct lttng_consumer_stream *stream,
1583 struct stream_subbuffer *subbuffer)
1584 {
1585 int ret;
1586 enum get_next_subbuffer_status status;
1587
1588 ret = kernctl_get_next_subbuf(stream->wait_fd);
1589 switch (ret) {
1590 case 0:
1591 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1592 break;
1593 case -ENODATA:
1594 case -EAGAIN:
1595 /*
1596 * The caller only expects -ENODATA when there is no data to
1597 * read, but the kernel tracer returns -EAGAIN when there is
1598 * currently no data for a non-finalized stream, and -ENODATA
1599 * when there is no data for a finalized stream. Those can be
1600 * combined into a -ENODATA return value.
1601 */
1602 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1603 goto end;
1604 default:
1605 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1606 goto end;
1607 }
1608
1609 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1610 stream, subbuffer);
1611 if (ret) {
1612 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1613 }
1614 end:
1615 return status;
1616 }
1617
1618 static
1619 enum get_next_subbuffer_status get_next_subbuffer_splice(
1620 struct lttng_consumer_stream *stream,
1621 struct stream_subbuffer *subbuffer)
1622 {
1623 const enum get_next_subbuffer_status status =
1624 get_subbuffer_common(stream, subbuffer);
1625
1626 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1627 goto end;
1628 }
1629
1630 subbuffer->buffer.fd = stream->wait_fd;
1631 end:
1632 return status;
1633 }
1634
1635 static
1636 enum get_next_subbuffer_status get_next_subbuffer_mmap(
1637 struct lttng_consumer_stream *stream,
1638 struct stream_subbuffer *subbuffer)
1639 {
1640 int ret;
1641 enum get_next_subbuffer_status status;
1642 const char *addr;
1643
1644 status = get_subbuffer_common(stream, subbuffer);
1645 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
1646 goto end;
1647 }
1648
1649 ret = get_current_subbuf_addr(stream, &addr);
1650 if (ret) {
1651 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1652 goto end;
1653 }
1654
1655 subbuffer->buffer.buffer = lttng_buffer_view_init(
1656 addr, 0, subbuffer->info.data.padded_subbuf_size);
1657 end:
1658 return status;
1659 }
1660
1661 static
1662 enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1663 struct stream_subbuffer *subbuffer)
1664 {
1665 int ret;
1666 const char *addr;
1667 bool coherent;
1668 enum get_next_subbuffer_status status;
1669
1670 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1671 &coherent);
1672 if (ret) {
1673 goto end;
1674 }
1675
1676 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1677 stream, subbuffer);
1678 if (ret) {
1679 goto end;
1680 }
1681
1682 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1683
1684 ret = get_current_subbuf_addr(stream, &addr);
1685 if (ret) {
1686 goto end;
1687 }
1688
1689 subbuffer->buffer.buffer = lttng_buffer_view_init(
1690 addr, 0, subbuffer->info.data.padded_subbuf_size);
1691 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1692 subbuffer->info.metadata.padded_subbuf_size,
1693 coherent ? "true" : "false");
1694 end:
1695 /*
1696 * The caller only expects -ENODATA when there is no data to read, but
1697 * the kernel tracer returns -EAGAIN when there is currently no data
1698 * for a non-finalized stream, and -ENODATA when there is no data for a
1699 * finalized stream. Those can be combined into a -ENODATA return value.
1700 */
1701 switch (ret) {
1702 case 0:
1703 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1704 break;
1705 case -ENODATA:
1706 case -EAGAIN:
1707 /*
1708 * The caller only expects -ENODATA when there is no data to
1709 * read, but the kernel tracer returns -EAGAIN when there is
1710 * currently no data for a non-finalized stream, and -ENODATA
1711 * when there is no data for a finalized stream. Those can be
1712 * combined into a -ENODATA return value.
1713 */
1714 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1715 break;
1716 default:
1717 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1718 break;
1719 }
1720
1721 return status;
1722 }
1723
1724 static
1725 int put_next_subbuffer(struct lttng_consumer_stream *stream,
1726 struct stream_subbuffer *subbuffer __attribute__((unused)))
1727 {
1728 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1729
1730 if (ret) {
1731 if (ret == -EFAULT) {
1732 PERROR("Error in unreserving sub buffer");
1733 } else if (ret == -EIO) {
1734 /* Should never happen with newer LTTng versions */
1735 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1736 }
1737 }
1738
1739 return ret;
1740 }
1741
1742 static
1743 bool is_get_next_check_metadata_available(int tracer_fd)
1744 {
1745 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1746 const bool available = ret != -ENOTTY;
1747
1748 if (ret == 0) {
1749 /* get succeeded, make sure to put the subbuffer. */
1750 kernctl_put_subbuf(tracer_fd);
1751 }
1752
1753 return available;
1754 }
1755
1756 static
1757 int signal_metadata(struct lttng_consumer_stream *stream,
1758 struct lttng_consumer_local_data *ctx __attribute__((unused)))
1759 {
1760 ASSERT_LOCKED(stream->metadata_rdv_lock);
1761 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1762 }
1763
1764 static
1765 int lttng_kconsumer_set_stream_ops(
1766 struct lttng_consumer_stream *stream)
1767 {
1768 int ret = 0;
1769
1770 if (stream->metadata_flag && stream->chan->is_live) {
1771 DBG("Attempting to enable metadata bucketization for live consumers");
1772 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1773 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1774 stream->read_subbuffer_ops.get_next_subbuffer =
1775 get_next_subbuffer_metadata_check;
1776 ret = consumer_stream_enable_metadata_bucketization(
1777 stream);
1778 if (ret) {
1779 goto end;
1780 }
1781 } else {
1782 /*
1783 * The kernel tracer version is too old to indicate
1784 * when the metadata stream has reached a "coherent"
1785 * (parseable) point.
1786 *
1787 * This means that a live viewer may see an incoherent
1788 * sequence of metadata and fail to parse it.
1789 */
1790 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1791 metadata_bucket_destroy(stream->metadata_bucket);
1792 stream->metadata_bucket = NULL;
1793 }
1794
1795 stream->read_subbuffer_ops.on_sleep = signal_metadata;
1796 }
1797
1798 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1799 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1800 stream->read_subbuffer_ops.get_next_subbuffer =
1801 get_next_subbuffer_mmap;
1802 } else {
1803 stream->read_subbuffer_ops.get_next_subbuffer =
1804 get_next_subbuffer_splice;
1805 }
1806 }
1807
1808 if (stream->metadata_flag) {
1809 stream->read_subbuffer_ops.extract_subbuffer_info =
1810 extract_metadata_subbuffer_info;
1811 } else {
1812 stream->read_subbuffer_ops.extract_subbuffer_info =
1813 extract_data_subbuffer_info;
1814 if (stream->chan->is_live) {
1815 stream->read_subbuffer_ops.send_live_beacon =
1816 consumer_flush_kernel_index;
1817 }
1818 }
1819
1820 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
1821 end:
1822 return ret;
1823 }
1824
1825 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1826 {
1827 int ret;
1828
1829 LTTNG_ASSERT(stream);
1830
1831 /*
1832 * Don't create anything if this is set for streaming or if there is
1833 * no current trace chunk on the parent channel.
1834 */
1835 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1836 stream->chan->trace_chunk) {
1837 ret = consumer_stream_create_output_files(stream, true);
1838 if (ret) {
1839 goto error;
1840 }
1841 }
1842
1843 if (stream->output == LTTNG_EVENT_MMAP) {
1844 /* get the len of the mmap region */
1845 unsigned long mmap_len;
1846
1847 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1848 if (ret != 0) {
1849 PERROR("kernctl_get_mmap_len");
1850 goto error_close_fd;
1851 }
1852 stream->mmap_len = (size_t) mmap_len;
1853
1854 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1855 MAP_PRIVATE, stream->wait_fd, 0);
1856 if (stream->mmap_base == MAP_FAILED) {
1857 PERROR("Error mmaping");
1858 ret = -1;
1859 goto error_close_fd;
1860 }
1861 }
1862
1863 ret = lttng_kconsumer_set_stream_ops(stream);
1864 if (ret) {
1865 goto error_close_fd;
1866 }
1867
1868 /* we return 0 to let the library handle the FD internally */
1869 return 0;
1870
1871 error_close_fd:
1872 if (stream->out_fd >= 0) {
1873 int err;
1874
1875 err = close(stream->out_fd);
1876 LTTNG_ASSERT(!err);
1877 stream->out_fd = -1;
1878 }
1879 error:
1880 return ret;
1881 }
1882
1883 /*
1884 * Check if data is still being extracted from the buffers for a specific
1885 * stream. Consumer data lock MUST be acquired before calling this function
1886 * and the stream lock.
1887 *
1888 * Return 1 if the traced data are still getting read else 0 meaning that the
1889 * data is available for trace viewer reading.
1890 */
1891 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1892 {
1893 int ret;
1894
1895 LTTNG_ASSERT(stream);
1896
1897 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1898 ret = 0;
1899 goto end;
1900 }
1901
1902 ret = kernctl_get_next_subbuf(stream->wait_fd);
1903 if (ret == 0) {
1904 /* There is still data so let's put back this subbuffer. */
1905 ret = kernctl_put_subbuf(stream->wait_fd);
1906 LTTNG_ASSERT(ret == 0);
1907 ret = 1; /* Data is pending */
1908 goto end;
1909 }
1910
1911 /* Data is NOT pending and ready to be read. */
1912 ret = 0;
1913
1914 end:
1915 return ret;
1916 }
This page took 0.136753 seconds and 4 git commands to generate.