consumerd: move address computation from on_read_subbuffer_mmap
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdint.h>
21 #define _LGPL_SOURCE
22 #include <assert.h>
23 #include <poll.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <inttypes.h>
31 #include <unistd.h>
32 #include <sys/stat.h>
33
34 #include <bin/lttng-consumerd/health-consumerd.h>
35 #include <common/common.h>
36 #include <common/kernel-ctl/kernel-ctl.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/sessiond-comm/relayd.h>
39 #include <common/compat/fcntl.h>
40 #include <common/compat/endian.h>
41 #include <common/pipe.h>
42 #include <common/relayd/relayd.h>
43 #include <common/utils.h>
44 #include <common/consumer/consumer-stream.h>
45 #include <common/index/index.h>
46 #include <common/consumer/consumer-timer.h>
47 #include <common/optional.h>
48
49 #include "kernel-consumer.h"
50
51 extern struct lttng_consumer_global_data consumer_data;
52 extern int consumer_poll_timeout;
53
54 /*
55 * Take a snapshot for a specific fd
56 *
57 * Returns 0 on success, < 0 on error
58 */
59 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
60 {
61 int ret = 0;
62 int infd = stream->wait_fd;
63
64 ret = kernctl_snapshot(infd);
65 /*
66 * -EAGAIN is not an error, it just means that there is no data to
67 * be read.
68 */
69 if (ret != 0 && ret != -EAGAIN) {
70 PERROR("Getting sub-buffer snapshot.");
71 }
72
73 return ret;
74 }
75
76 /*
77 * Sample consumed and produced positions for a specific fd.
78 *
79 * Returns 0 on success, < 0 on error.
80 */
81 int lttng_kconsumer_sample_snapshot_positions(
82 struct lttng_consumer_stream *stream)
83 {
84 assert(stream);
85
86 return kernctl_snapshot_sample_positions(stream->wait_fd);
87 }
88
89 /*
90 * Get the produced position
91 *
92 * Returns 0 on success, < 0 on error
93 */
94 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
95 unsigned long *pos)
96 {
97 int ret;
98 int infd = stream->wait_fd;
99
100 ret = kernctl_snapshot_get_produced(infd, pos);
101 if (ret != 0) {
102 PERROR("kernctl_snapshot_get_produced");
103 }
104
105 return ret;
106 }
107
108 /*
109 * Get the consumerd position
110 *
111 * Returns 0 on success, < 0 on error
112 */
113 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
114 unsigned long *pos)
115 {
116 int ret;
117 int infd = stream->wait_fd;
118
119 ret = kernctl_snapshot_get_consumed(infd, pos);
120 if (ret != 0) {
121 PERROR("kernctl_snapshot_get_consumed");
122 }
123
124 return ret;
125 }
126
127 static
128 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
129 const char **addr)
130 {
131 int ret;
132 unsigned long mmap_offset;
133 const char *mmap_base = stream->mmap_base;
134
135 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
136 if (ret < 0) {
137 PERROR("Failed to get mmap read offset");
138 goto error;
139 }
140
141 *addr = mmap_base + mmap_offset;
142 error:
143 return ret;
144 }
145
146 /*
147 * Take a snapshot of all the stream of a channel
148 * RCU read-side lock must be held across this function to ensure existence of
149 * channel. The channel lock must be held by the caller.
150 *
151 * Returns 0 on success, < 0 on error
152 */
153 static int lttng_kconsumer_snapshot_channel(
154 struct lttng_consumer_channel *channel,
155 uint64_t key, char *path, uint64_t relayd_id,
156 uint64_t nb_packets_per_stream,
157 struct lttng_consumer_local_data *ctx)
158 {
159 int ret;
160 struct lttng_consumer_stream *stream;
161
162 DBG("Kernel consumer snapshot channel %" PRIu64, key);
163
164 rcu_read_lock();
165
166 /* Splice is not supported yet for channel snapshot. */
167 if (channel->output != CONSUMER_CHANNEL_MMAP) {
168 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
169 channel->name);
170 ret = -1;
171 goto end;
172 }
173
174 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
175 unsigned long consumed_pos, produced_pos;
176
177 health_code_update();
178
179 /*
180 * Lock stream because we are about to change its state.
181 */
182 pthread_mutex_lock(&stream->lock);
183
184 assert(channel->trace_chunk);
185 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
186 /*
187 * Can't happen barring an internal error as the channel
188 * holds a reference to the trace chunk.
189 */
190 ERR("Failed to acquire reference to channel's trace chunk");
191 ret = -1;
192 goto end_unlock;
193 }
194 assert(!stream->trace_chunk);
195 stream->trace_chunk = channel->trace_chunk;
196
197 /*
198 * Assign the received relayd ID so we can use it for streaming. The streams
199 * are not visible to anyone so this is OK to change it.
200 */
201 stream->net_seq_idx = relayd_id;
202 channel->relayd_id = relayd_id;
203 if (relayd_id != (uint64_t) -1ULL) {
204 ret = consumer_send_relayd_stream(stream, path);
205 if (ret < 0) {
206 ERR("sending stream to relayd");
207 goto end_unlock;
208 }
209 } else {
210 ret = consumer_stream_create_output_files(stream,
211 false);
212 if (ret < 0) {
213 goto end_unlock;
214 }
215 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
216 stream->key);
217 }
218
219 ret = kernctl_buffer_flush_empty(stream->wait_fd);
220 if (ret < 0) {
221 /*
222 * Doing a buffer flush which does not take into
223 * account empty packets. This is not perfect
224 * for stream intersection, but required as a
225 * fall-back when "flush_empty" is not
226 * implemented by lttng-modules.
227 */
228 ret = kernctl_buffer_flush(stream->wait_fd);
229 if (ret < 0) {
230 ERR("Failed to flush kernel stream");
231 goto end_unlock;
232 }
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_take_snapshot(stream);
237 if (ret < 0) {
238 ERR("Taking kernel snapshot");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
243 if (ret < 0) {
244 ERR("Produced kernel snapshot position");
245 goto end_unlock;
246 }
247
248 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
249 if (ret < 0) {
250 ERR("Consumerd kernel snapshot position");
251 goto end_unlock;
252 }
253
254 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
255 produced_pos, nb_packets_per_stream,
256 stream->max_sb_size);
257
258 while ((long) (consumed_pos - produced_pos) < 0) {
259 ssize_t read_len;
260 unsigned long len, padded_len;
261 const char *subbuf_addr;
262
263 health_code_update();
264
265 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
266
267 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
268 if (ret < 0) {
269 if (ret != -EAGAIN) {
270 PERROR("kernctl_get_subbuf snapshot");
271 goto end_unlock;
272 }
273 DBG("Kernel consumer get subbuf failed. Skipping it.");
274 consumed_pos += stream->max_sb_size;
275 stream->chan->lost_packets++;
276 continue;
277 }
278
279 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_subbuf_size");
282 goto error_put_subbuf;
283 }
284
285 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
286 if (ret < 0) {
287 ERR("Snapshot kernctl_get_padded_subbuf_size");
288 goto error_put_subbuf;
289 }
290
291 ret = get_current_subbuf_addr(stream, &subbuf_addr);
292 if (ret) {
293 goto error_put_subbuf;
294 }
295
296 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
297 stream, subbuf_addr, len,
298 padded_len - len, NULL);
299 /*
300 * We write the padded len in local tracefiles but the data len
301 * when using a relay. Display the error but continue processing
302 * to try to release the subbuffer.
303 */
304 if (relayd_id != (uint64_t) -1ULL) {
305 if (read_len != len) {
306 ERR("Error sending to the relay (ret: %zd != len: %lu)",
307 read_len, len);
308 }
309 } else {
310 if (read_len != padded_len) {
311 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
312 read_len, padded_len);
313 }
314 }
315
316 ret = kernctl_put_subbuf(stream->wait_fd);
317 if (ret < 0) {
318 ERR("Snapshot kernctl_put_subbuf");
319 goto end_unlock;
320 }
321 consumed_pos += stream->max_sb_size;
322 }
323
324 if (relayd_id == (uint64_t) -1ULL) {
325 if (stream->out_fd >= 0) {
326 ret = close(stream->out_fd);
327 if (ret < 0) {
328 PERROR("Kernel consumer snapshot close out_fd");
329 goto end_unlock;
330 }
331 stream->out_fd = -1;
332 }
333 } else {
334 close_relayd_stream(stream);
335 stream->net_seq_idx = (uint64_t) -1ULL;
336 }
337 lttng_trace_chunk_put(stream->trace_chunk);
338 stream->trace_chunk = NULL;
339 pthread_mutex_unlock(&stream->lock);
340 }
341
342 /* All good! */
343 ret = 0;
344 goto end;
345
346 error_put_subbuf:
347 ret = kernctl_put_subbuf(stream->wait_fd);
348 if (ret < 0) {
349 ERR("Snapshot kernctl_put_subbuf error path");
350 }
351 end_unlock:
352 pthread_mutex_unlock(&stream->lock);
353 end:
354 rcu_read_unlock();
355 return ret;
356 }
357
358 /*
359 * Read the whole metadata available for a snapshot.
360 * RCU read-side lock must be held across this function to ensure existence of
361 * metadata_channel. The channel lock must be held by the caller.
362 *
363 * Returns 0 on success, < 0 on error
364 */
365 static int lttng_kconsumer_snapshot_metadata(
366 struct lttng_consumer_channel *metadata_channel,
367 uint64_t key, char *path, uint64_t relayd_id,
368 struct lttng_consumer_local_data *ctx)
369 {
370 int ret, use_relayd = 0;
371 ssize_t ret_read;
372 struct lttng_consumer_stream *metadata_stream;
373
374 assert(ctx);
375
376 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
377 key, path);
378
379 rcu_read_lock();
380
381 metadata_stream = metadata_channel->metadata_stream;
382 assert(metadata_stream);
383
384 pthread_mutex_lock(&metadata_stream->lock);
385 assert(metadata_channel->trace_chunk);
386 assert(metadata_stream->trace_chunk);
387
388 /* Flag once that we have a valid relayd for the stream. */
389 if (relayd_id != (uint64_t) -1ULL) {
390 use_relayd = 1;
391 }
392
393 if (use_relayd) {
394 ret = consumer_send_relayd_stream(metadata_stream, path);
395 if (ret < 0) {
396 goto error_snapshot;
397 }
398 } else {
399 ret = consumer_stream_create_output_files(metadata_stream,
400 false);
401 if (ret < 0) {
402 goto error_snapshot;
403 }
404 }
405
406 do {
407 health_code_update();
408
409 ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
410 if (ret_read < 0) {
411 if (ret_read != -EAGAIN) {
412 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
413 ret_read);
414 ret = ret_read;
415 goto error_snapshot;
416 }
417 /* ret_read is negative at this point so we will exit the loop. */
418 continue;
419 }
420 } while (ret_read >= 0);
421
422 if (use_relayd) {
423 close_relayd_stream(metadata_stream);
424 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
425 } else {
426 if (metadata_stream->out_fd >= 0) {
427 ret = close(metadata_stream->out_fd);
428 if (ret < 0) {
429 PERROR("Kernel consumer snapshot metadata close out_fd");
430 /*
431 * Don't go on error here since the snapshot was successful at this
432 * point but somehow the close failed.
433 */
434 }
435 metadata_stream->out_fd = -1;
436 lttng_trace_chunk_put(metadata_stream->trace_chunk);
437 metadata_stream->trace_chunk = NULL;
438 }
439 }
440
441 ret = 0;
442 error_snapshot:
443 pthread_mutex_unlock(&metadata_stream->lock);
444 cds_list_del(&metadata_stream->send_node);
445 consumer_stream_destroy(metadata_stream, NULL);
446 metadata_channel->metadata_stream = NULL;
447 rcu_read_unlock();
448 return ret;
449 }
450
451 /*
452 * Receive command from session daemon and process it.
453 *
454 * Return 1 on success else a negative value or 0.
455 */
456 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
457 int sock, struct pollfd *consumer_sockpoll)
458 {
459 ssize_t ret;
460 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
461 struct lttcomm_consumer_msg msg;
462
463 health_code_update();
464
465 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
466 if (ret != sizeof(msg)) {
467 if (ret > 0) {
468 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
469 ret = -1;
470 }
471 return ret;
472 }
473
474 health_code_update();
475
476 /* Deprecated command */
477 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
478
479 health_code_update();
480
481 /* relayd needs RCU read-side protection */
482 rcu_read_lock();
483
484 switch (msg.cmd_type) {
485 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
486 {
487 /* Session daemon status message are handled in the following call. */
488 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
489 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
490 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
491 msg.u.relayd_sock.relayd_session_id);
492 goto end_nosignal;
493 }
494 case LTTNG_CONSUMER_ADD_CHANNEL:
495 {
496 struct lttng_consumer_channel *new_channel;
497 int ret_recv;
498 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
499
500 health_code_update();
501
502 /* First send a status message before receiving the fds. */
503 ret = consumer_send_status_msg(sock, ret_code);
504 if (ret < 0) {
505 /* Somehow, the session daemon is not responding anymore. */
506 goto error_fatal;
507 }
508
509 health_code_update();
510
511 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
512 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
513 msg.u.channel.session_id,
514 msg.u.channel.chunk_id.is_set ?
515 &chunk_id : NULL,
516 msg.u.channel.pathname,
517 msg.u.channel.name,
518 msg.u.channel.relayd_id, msg.u.channel.output,
519 msg.u.channel.tracefile_size,
520 msg.u.channel.tracefile_count, 0,
521 msg.u.channel.monitor,
522 msg.u.channel.live_timer_interval,
523 NULL, NULL);
524 if (new_channel == NULL) {
525 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
526 goto end_nosignal;
527 }
528 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
529 switch (msg.u.channel.output) {
530 case LTTNG_EVENT_SPLICE:
531 new_channel->output = CONSUMER_CHANNEL_SPLICE;
532 break;
533 case LTTNG_EVENT_MMAP:
534 new_channel->output = CONSUMER_CHANNEL_MMAP;
535 break;
536 default:
537 ERR("Channel output unknown %d", msg.u.channel.output);
538 goto end_nosignal;
539 }
540
541 /* Translate and save channel type. */
542 switch (msg.u.channel.type) {
543 case CONSUMER_CHANNEL_TYPE_DATA:
544 case CONSUMER_CHANNEL_TYPE_METADATA:
545 new_channel->type = msg.u.channel.type;
546 break;
547 default:
548 assert(0);
549 goto end_nosignal;
550 };
551
552 health_code_update();
553
554 if (ctx->on_recv_channel != NULL) {
555 ret_recv = ctx->on_recv_channel(new_channel);
556 if (ret_recv == 0) {
557 ret = consumer_add_channel(new_channel, ctx);
558 } else if (ret_recv < 0) {
559 goto end_nosignal;
560 }
561 } else {
562 ret = consumer_add_channel(new_channel, ctx);
563 }
564 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
565 int monitor_start_ret;
566
567 DBG("Consumer starting monitor timer");
568 consumer_timer_live_start(new_channel,
569 msg.u.channel.live_timer_interval);
570 monitor_start_ret = consumer_timer_monitor_start(
571 new_channel,
572 msg.u.channel.monitor_timer_interval);
573 if (monitor_start_ret < 0) {
574 ERR("Starting channel monitoring timer failed");
575 goto end_nosignal;
576 }
577
578 }
579
580 health_code_update();
581
582 /* If we received an error in add_channel, we need to report it. */
583 if (ret < 0) {
584 ret = consumer_send_status_msg(sock, ret);
585 if (ret < 0) {
586 goto error_fatal;
587 }
588 goto end_nosignal;
589 }
590
591 goto end_nosignal;
592 }
593 case LTTNG_CONSUMER_ADD_STREAM:
594 {
595 int fd;
596 struct lttng_pipe *stream_pipe;
597 struct lttng_consumer_stream *new_stream;
598 struct lttng_consumer_channel *channel;
599 int alloc_ret = 0;
600
601 /*
602 * Get stream's channel reference. Needed when adding the stream to the
603 * global hash table.
604 */
605 channel = consumer_find_channel(msg.u.stream.channel_key);
606 if (!channel) {
607 /*
608 * We could not find the channel. Can happen if cpu hotplug
609 * happens while tearing down.
610 */
611 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
612 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
613 }
614
615 health_code_update();
616
617 /* First send a status message before receiving the fds. */
618 ret = consumer_send_status_msg(sock, ret_code);
619 if (ret < 0) {
620 /* Somehow, the session daemon is not responding anymore. */
621 goto error_add_stream_fatal;
622 }
623
624 health_code_update();
625
626 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
627 /* Channel was not found. */
628 goto error_add_stream_nosignal;
629 }
630
631 /* Blocking call */
632 health_poll_entry();
633 ret = lttng_consumer_poll_socket(consumer_sockpoll);
634 health_poll_exit();
635 if (ret) {
636 goto error_add_stream_fatal;
637 }
638
639 health_code_update();
640
641 /* Get stream file descriptor from socket */
642 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
643 if (ret != sizeof(fd)) {
644 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
645 goto end;
646 }
647
648 health_code_update();
649
650 /*
651 * Send status code to session daemon only if the recv works. If the
652 * above recv() failed, the session daemon is notified through the
653 * error socket and the teardown is eventually done.
654 */
655 ret = consumer_send_status_msg(sock, ret_code);
656 if (ret < 0) {
657 /* Somehow, the session daemon is not responding anymore. */
658 goto error_add_stream_nosignal;
659 }
660
661 health_code_update();
662
663 pthread_mutex_lock(&channel->lock);
664 new_stream = consumer_allocate_stream(channel->key,
665 fd,
666 channel->name,
667 channel->relayd_id,
668 channel->session_id,
669 channel->trace_chunk,
670 msg.u.stream.cpu,
671 &alloc_ret,
672 channel->type,
673 channel->monitor);
674 if (new_stream == NULL) {
675 switch (alloc_ret) {
676 case -ENOMEM:
677 case -EINVAL:
678 default:
679 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
680 break;
681 }
682 pthread_mutex_unlock(&channel->lock);
683 goto error_add_stream_nosignal;
684 }
685
686 new_stream->chan = channel;
687 new_stream->wait_fd = fd;
688 ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
689 &new_stream->max_sb_size);
690 if (ret < 0) {
691 pthread_mutex_unlock(&channel->lock);
692 ERR("Failed to get kernel maximal subbuffer size");
693 goto error_add_stream_nosignal;
694 }
695
696 consumer_stream_update_channel_attributes(new_stream,
697 channel);
698 switch (channel->output) {
699 case CONSUMER_CHANNEL_SPLICE:
700 new_stream->output = LTTNG_EVENT_SPLICE;
701 ret = utils_create_pipe(new_stream->splice_pipe);
702 if (ret < 0) {
703 pthread_mutex_unlock(&channel->lock);
704 goto error_add_stream_nosignal;
705 }
706 break;
707 case CONSUMER_CHANNEL_MMAP:
708 new_stream->output = LTTNG_EVENT_MMAP;
709 break;
710 default:
711 ERR("Stream output unknown %d", channel->output);
712 pthread_mutex_unlock(&channel->lock);
713 goto error_add_stream_nosignal;
714 }
715
716 /*
717 * We've just assigned the channel to the stream so increment the
718 * refcount right now. We don't need to increment the refcount for
719 * streams in no monitor because we handle manually the cleanup of
720 * those. It is very important to make sure there is NO prior
721 * consumer_del_stream() calls or else the refcount will be unbalanced.
722 */
723 if (channel->monitor) {
724 uatomic_inc(&new_stream->chan->refcount);
725 }
726
727 /*
728 * The buffer flush is done on the session daemon side for the kernel
729 * so no need for the stream "hangup_flush_done" variable to be
730 * tracked. This is important for a kernel stream since we don't rely
731 * on the flush state of the stream to read data. It's not the case for
732 * user space tracing.
733 */
734 new_stream->hangup_flush_done = 0;
735
736 health_code_update();
737
738 pthread_mutex_lock(&new_stream->lock);
739 if (ctx->on_recv_stream) {
740 ret = ctx->on_recv_stream(new_stream);
741 if (ret < 0) {
742 pthread_mutex_unlock(&new_stream->lock);
743 pthread_mutex_unlock(&channel->lock);
744 consumer_stream_free(new_stream);
745 goto error_add_stream_nosignal;
746 }
747 }
748 health_code_update();
749
750 if (new_stream->metadata_flag) {
751 channel->metadata_stream = new_stream;
752 }
753
754 /* Do not monitor this stream. */
755 if (!channel->monitor) {
756 DBG("Kernel consumer add stream %s in no monitor mode with "
757 "relayd id %" PRIu64, new_stream->name,
758 new_stream->net_seq_idx);
759 cds_list_add(&new_stream->send_node, &channel->streams.head);
760 pthread_mutex_unlock(&new_stream->lock);
761 pthread_mutex_unlock(&channel->lock);
762 goto end_add_stream;
763 }
764
765 /* Send stream to relayd if the stream has an ID. */
766 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
767 ret = consumer_send_relayd_stream(new_stream,
768 new_stream->chan->pathname);
769 if (ret < 0) {
770 pthread_mutex_unlock(&new_stream->lock);
771 pthread_mutex_unlock(&channel->lock);
772 consumer_stream_free(new_stream);
773 goto error_add_stream_nosignal;
774 }
775
776 /*
777 * If adding an extra stream to an already
778 * existing channel (e.g. cpu hotplug), we need
779 * to send the "streams_sent" command to relayd.
780 */
781 if (channel->streams_sent_to_relayd) {
782 ret = consumer_send_relayd_streams_sent(
783 new_stream->net_seq_idx);
784 if (ret < 0) {
785 pthread_mutex_unlock(&new_stream->lock);
786 pthread_mutex_unlock(&channel->lock);
787 goto error_add_stream_nosignal;
788 }
789 }
790 }
791 pthread_mutex_unlock(&new_stream->lock);
792 pthread_mutex_unlock(&channel->lock);
793
794 /* Get the right pipe where the stream will be sent. */
795 if (new_stream->metadata_flag) {
796 consumer_add_metadata_stream(new_stream);
797 stream_pipe = ctx->consumer_metadata_pipe;
798 } else {
799 consumer_add_data_stream(new_stream);
800 stream_pipe = ctx->consumer_data_pipe;
801 }
802
803 /* Visible to other threads */
804 new_stream->globally_visible = 1;
805
806 health_code_update();
807
808 ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
809 if (ret < 0) {
810 ERR("Consumer write %s stream to pipe %d",
811 new_stream->metadata_flag ? "metadata" : "data",
812 lttng_pipe_get_writefd(stream_pipe));
813 if (new_stream->metadata_flag) {
814 consumer_del_stream_for_metadata(new_stream);
815 } else {
816 consumer_del_stream_for_data(new_stream);
817 }
818 goto error_add_stream_nosignal;
819 }
820
821 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
822 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
823 end_add_stream:
824 break;
825 error_add_stream_nosignal:
826 goto end_nosignal;
827 error_add_stream_fatal:
828 goto error_fatal;
829 }
830 case LTTNG_CONSUMER_STREAMS_SENT:
831 {
832 struct lttng_consumer_channel *channel;
833
834 /*
835 * Get stream's channel reference. Needed when adding the stream to the
836 * global hash table.
837 */
838 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
839 if (!channel) {
840 /*
841 * We could not find the channel. Can happen if cpu hotplug
842 * happens while tearing down.
843 */
844 ERR("Unable to find channel key %" PRIu64,
845 msg.u.sent_streams.channel_key);
846 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
847 }
848
849 health_code_update();
850
851 /*
852 * Send status code to session daemon.
853 */
854 ret = consumer_send_status_msg(sock, ret_code);
855 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
856 /* Somehow, the session daemon is not responding anymore. */
857 goto error_streams_sent_nosignal;
858 }
859
860 health_code_update();
861
862 /*
863 * We should not send this message if we don't monitor the
864 * streams in this channel.
865 */
866 if (!channel->monitor) {
867 goto end_error_streams_sent;
868 }
869
870 health_code_update();
871 /* Send stream to relayd if the stream has an ID. */
872 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
873 ret = consumer_send_relayd_streams_sent(
874 msg.u.sent_streams.net_seq_idx);
875 if (ret < 0) {
876 goto error_streams_sent_nosignal;
877 }
878 channel->streams_sent_to_relayd = true;
879 }
880 end_error_streams_sent:
881 break;
882 error_streams_sent_nosignal:
883 goto end_nosignal;
884 }
885 case LTTNG_CONSUMER_UPDATE_STREAM:
886 {
887 rcu_read_unlock();
888 return -ENOSYS;
889 }
890 case LTTNG_CONSUMER_DESTROY_RELAYD:
891 {
892 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
893 struct consumer_relayd_sock_pair *relayd;
894
895 DBG("Kernel consumer destroying relayd %" PRIu64, index);
896
897 /* Get relayd reference if exists. */
898 relayd = consumer_find_relayd(index);
899 if (relayd == NULL) {
900 DBG("Unable to find relayd %" PRIu64, index);
901 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
902 }
903
904 /*
905 * Each relayd socket pair has a refcount of stream attached to it
906 * which tells if the relayd is still active or not depending on the
907 * refcount value.
908 *
909 * This will set the destroy flag of the relayd object and destroy it
910 * if the refcount reaches zero when called.
911 *
912 * The destroy can happen either here or when a stream fd hangs up.
913 */
914 if (relayd) {
915 consumer_flag_relayd_for_destroy(relayd);
916 }
917
918 health_code_update();
919
920 ret = consumer_send_status_msg(sock, ret_code);
921 if (ret < 0) {
922 /* Somehow, the session daemon is not responding anymore. */
923 goto error_fatal;
924 }
925
926 goto end_nosignal;
927 }
928 case LTTNG_CONSUMER_DATA_PENDING:
929 {
930 int32_t ret;
931 uint64_t id = msg.u.data_pending.session_id;
932
933 DBG("Kernel consumer data pending command for id %" PRIu64, id);
934
935 ret = consumer_data_pending(id);
936
937 health_code_update();
938
939 /* Send back returned value to session daemon */
940 ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
941 if (ret < 0) {
942 PERROR("send data pending ret code");
943 goto error_fatal;
944 }
945
946 /*
947 * No need to send back a status message since the data pending
948 * returned value is the response.
949 */
950 break;
951 }
952 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
953 {
954 struct lttng_consumer_channel *channel;
955 uint64_t key = msg.u.snapshot_channel.key;
956
957 channel = consumer_find_channel(key);
958 if (!channel) {
959 ERR("Channel %" PRIu64 " not found", key);
960 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
961 } else {
962 pthread_mutex_lock(&channel->lock);
963 if (msg.u.snapshot_channel.metadata == 1) {
964 ret = lttng_kconsumer_snapshot_metadata(channel, key,
965 msg.u.snapshot_channel.pathname,
966 msg.u.snapshot_channel.relayd_id, ctx);
967 if (ret < 0) {
968 ERR("Snapshot metadata failed");
969 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
970 }
971 } else {
972 ret = lttng_kconsumer_snapshot_channel(channel, key,
973 msg.u.snapshot_channel.pathname,
974 msg.u.snapshot_channel.relayd_id,
975 msg.u.snapshot_channel.nb_packets_per_stream,
976 ctx);
977 if (ret < 0) {
978 ERR("Snapshot channel failed");
979 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
980 }
981 }
982 pthread_mutex_unlock(&channel->lock);
983 }
984 health_code_update();
985
986 ret = consumer_send_status_msg(sock, ret_code);
987 if (ret < 0) {
988 /* Somehow, the session daemon is not responding anymore. */
989 goto end_nosignal;
990 }
991 break;
992 }
993 case LTTNG_CONSUMER_DESTROY_CHANNEL:
994 {
995 uint64_t key = msg.u.destroy_channel.key;
996 struct lttng_consumer_channel *channel;
997
998 channel = consumer_find_channel(key);
999 if (!channel) {
1000 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
1001 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1002 }
1003
1004 health_code_update();
1005
1006 ret = consumer_send_status_msg(sock, ret_code);
1007 if (ret < 0) {
1008 /* Somehow, the session daemon is not responding anymore. */
1009 goto end_destroy_channel;
1010 }
1011
1012 health_code_update();
1013
1014 /* Stop right now if no channel was found. */
1015 if (!channel) {
1016 goto end_destroy_channel;
1017 }
1018
1019 /*
1020 * This command should ONLY be issued for channel with streams set in
1021 * no monitor mode.
1022 */
1023 assert(!channel->monitor);
1024
1025 /*
1026 * The refcount should ALWAYS be 0 in the case of a channel in no
1027 * monitor mode.
1028 */
1029 assert(!uatomic_sub_return(&channel->refcount, 1));
1030
1031 consumer_del_channel(channel);
1032 end_destroy_channel:
1033 goto end_nosignal;
1034 }
1035 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1036 {
1037 ssize_t ret;
1038 uint64_t count;
1039 struct lttng_consumer_channel *channel;
1040 uint64_t id = msg.u.discarded_events.session_id;
1041 uint64_t key = msg.u.discarded_events.channel_key;
1042
1043 DBG("Kernel consumer discarded events command for session id %"
1044 PRIu64 ", channel key %" PRIu64, id, key);
1045
1046 channel = consumer_find_channel(key);
1047 if (!channel) {
1048 ERR("Kernel consumer discarded events channel %"
1049 PRIu64 " not found", key);
1050 count = 0;
1051 } else {
1052 count = channel->discarded_events;
1053 }
1054
1055 health_code_update();
1056
1057 /* Send back returned value to session daemon */
1058 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1059 if (ret < 0) {
1060 PERROR("send discarded events");
1061 goto error_fatal;
1062 }
1063
1064 break;
1065 }
1066 case LTTNG_CONSUMER_LOST_PACKETS:
1067 {
1068 ssize_t ret;
1069 uint64_t count;
1070 struct lttng_consumer_channel *channel;
1071 uint64_t id = msg.u.lost_packets.session_id;
1072 uint64_t key = msg.u.lost_packets.channel_key;
1073
1074 DBG("Kernel consumer lost packets command for session id %"
1075 PRIu64 ", channel key %" PRIu64, id, key);
1076
1077 channel = consumer_find_channel(key);
1078 if (!channel) {
1079 ERR("Kernel consumer lost packets channel %"
1080 PRIu64 " not found", key);
1081 count = 0;
1082 } else {
1083 count = channel->lost_packets;
1084 }
1085
1086 health_code_update();
1087
1088 /* Send back returned value to session daemon */
1089 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
1090 if (ret < 0) {
1091 PERROR("send lost packets");
1092 goto error_fatal;
1093 }
1094
1095 break;
1096 }
1097 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1098 {
1099 int channel_monitor_pipe;
1100
1101 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1102 /* Successfully received the command's type. */
1103 ret = consumer_send_status_msg(sock, ret_code);
1104 if (ret < 0) {
1105 goto error_fatal;
1106 }
1107
1108 ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
1109 1);
1110 if (ret != sizeof(channel_monitor_pipe)) {
1111 ERR("Failed to receive channel monitor pipe");
1112 goto error_fatal;
1113 }
1114
1115 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1116 ret = consumer_timer_thread_set_channel_monitor_pipe(
1117 channel_monitor_pipe);
1118 if (!ret) {
1119 int flags;
1120
1121 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1122 /* Set the pipe as non-blocking. */
1123 ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
1124 if (ret == -1) {
1125 PERROR("fcntl get flags of the channel monitoring pipe");
1126 goto error_fatal;
1127 }
1128 flags = ret;
1129
1130 ret = fcntl(channel_monitor_pipe, F_SETFL,
1131 flags | O_NONBLOCK);
1132 if (ret == -1) {
1133 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1134 goto error_fatal;
1135 }
1136 DBG("Channel monitor pipe set as non-blocking");
1137 } else {
1138 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1139 }
1140 ret = consumer_send_status_msg(sock, ret_code);
1141 if (ret < 0) {
1142 goto error_fatal;
1143 }
1144 break;
1145 }
1146 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1147 {
1148 struct lttng_consumer_channel *channel;
1149 uint64_t key = msg.u.rotate_channel.key;
1150
1151 DBG("Consumer rotate channel %" PRIu64, key);
1152
1153 channel = consumer_find_channel(key);
1154 if (!channel) {
1155 ERR("Channel %" PRIu64 " not found", key);
1156 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1157 } else {
1158 /*
1159 * Sample the rotate position of all the streams in this channel.
1160 */
1161 ret = lttng_consumer_rotate_channel(channel, key,
1162 msg.u.rotate_channel.relayd_id,
1163 msg.u.rotate_channel.metadata,
1164 ctx);
1165 if (ret < 0) {
1166 ERR("Rotate channel failed");
1167 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1168 }
1169
1170 health_code_update();
1171 }
1172 ret = consumer_send_status_msg(sock, ret_code);
1173 if (ret < 0) {
1174 /* Somehow, the session daemon is not responding anymore. */
1175 goto error_rotate_channel;
1176 }
1177 if (channel) {
1178 /* Rotate the streams that are ready right now. */
1179 ret = lttng_consumer_rotate_ready_streams(
1180 channel, key, ctx);
1181 if (ret < 0) {
1182 ERR("Rotate ready streams failed");
1183 }
1184 }
1185 break;
1186 error_rotate_channel:
1187 goto end_nosignal;
1188 }
1189 case LTTNG_CONSUMER_INIT:
1190 {
1191 ret_code = lttng_consumer_init_command(ctx,
1192 msg.u.init.sessiond_uuid);
1193 health_code_update();
1194 ret = consumer_send_status_msg(sock, ret_code);
1195 if (ret < 0) {
1196 /* Somehow, the session daemon is not responding anymore. */
1197 goto end_nosignal;
1198 }
1199 break;
1200 }
1201 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
1202 {
1203 const struct lttng_credentials credentials = {
1204 .uid = msg.u.create_trace_chunk.credentials.value.uid,
1205 .gid = msg.u.create_trace_chunk.credentials.value.gid,
1206 };
1207 const bool is_local_trace =
1208 !msg.u.create_trace_chunk.relayd_id.is_set;
1209 const uint64_t relayd_id =
1210 msg.u.create_trace_chunk.relayd_id.value;
1211 const char *chunk_override_name =
1212 *msg.u.create_trace_chunk.override_name ?
1213 msg.u.create_trace_chunk.override_name :
1214 NULL;
1215 LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle =
1216 LTTNG_OPTIONAL_INIT;
1217
1218 /*
1219 * The session daemon will only provide a chunk directory file
1220 * descriptor for local traces.
1221 */
1222 if (is_local_trace) {
1223 int chunk_dirfd;
1224
1225 /* Acnowledge the reception of the command. */
1226 ret = consumer_send_status_msg(sock,
1227 LTTCOMM_CONSUMERD_SUCCESS);
1228 if (ret < 0) {
1229 /* Somehow, the session daemon is not responding anymore. */
1230 goto end_nosignal;
1231 }
1232
1233 ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
1234 if (ret != sizeof(chunk_dirfd)) {
1235 ERR("Failed to receive trace chunk directory file descriptor");
1236 goto error_fatal;
1237 }
1238
1239 DBG("Received trace chunk directory fd (%d)",
1240 chunk_dirfd);
1241 ret = lttng_directory_handle_init_from_dirfd(
1242 &chunk_directory_handle.value,
1243 chunk_dirfd);
1244 if (ret) {
1245 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1246 if (close(chunk_dirfd)) {
1247 PERROR("Failed to close chunk directory file descriptor");
1248 }
1249 goto error_fatal;
1250 }
1251 chunk_directory_handle.is_set = true;
1252 }
1253
1254 ret_code = lttng_consumer_create_trace_chunk(
1255 !is_local_trace ? &relayd_id : NULL,
1256 msg.u.create_trace_chunk.session_id,
1257 msg.u.create_trace_chunk.chunk_id,
1258 (time_t) msg.u.create_trace_chunk
1259 .creation_timestamp,
1260 chunk_override_name,
1261 msg.u.create_trace_chunk.credentials.is_set ?
1262 &credentials :
1263 NULL,
1264 chunk_directory_handle.is_set ?
1265 &chunk_directory_handle.value :
1266 NULL);
1267
1268 if (chunk_directory_handle.is_set) {
1269 lttng_directory_handle_fini(
1270 &chunk_directory_handle.value);
1271 }
1272 goto end_msg_sessiond;
1273 }
1274 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
1275 {
1276 enum lttng_trace_chunk_command_type close_command =
1277 msg.u.close_trace_chunk.close_command.value;
1278 const uint64_t relayd_id =
1279 msg.u.close_trace_chunk.relayd_id.value;
1280 struct lttcomm_consumer_close_trace_chunk_reply reply;
1281 char path[LTTNG_PATH_MAX];
1282
1283 ret_code = lttng_consumer_close_trace_chunk(
1284 msg.u.close_trace_chunk.relayd_id.is_set ?
1285 &relayd_id :
1286 NULL,
1287 msg.u.close_trace_chunk.session_id,
1288 msg.u.close_trace_chunk.chunk_id,
1289 (time_t) msg.u.close_trace_chunk.close_timestamp,
1290 msg.u.close_trace_chunk.close_command.is_set ?
1291 &close_command :
1292 NULL, path);
1293 reply.ret_code = ret_code;
1294 reply.path_length = strlen(path) + 1;
1295 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1296 if (ret != sizeof(reply)) {
1297 goto error_fatal;
1298 }
1299 ret = lttcomm_send_unix_sock(sock, path, reply.path_length);
1300 if (ret != reply.path_length) {
1301 goto error_fatal;
1302 }
1303 goto end_nosignal;
1304 }
1305 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
1306 {
1307 const uint64_t relayd_id =
1308 msg.u.trace_chunk_exists.relayd_id.value;
1309
1310 ret_code = lttng_consumer_trace_chunk_exists(
1311 msg.u.trace_chunk_exists.relayd_id.is_set ?
1312 &relayd_id : NULL,
1313 msg.u.trace_chunk_exists.session_id,
1314 msg.u.trace_chunk_exists.chunk_id);
1315 goto end_msg_sessiond;
1316 }
1317 default:
1318 goto end_nosignal;
1319 }
1320
1321 end_nosignal:
1322 /*
1323 * Return 1 to indicate success since the 0 value can be a socket
1324 * shutdown during the recv() or send() call.
1325 */
1326 ret = 1;
1327 goto end;
1328 error_fatal:
1329 /* This will issue a consumer stop. */
1330 ret = -1;
1331 goto end;
1332 end_msg_sessiond:
1333 /*
1334 * The returned value here is not useful since either way we'll return 1 to
1335 * the caller because the session daemon socket management is done
1336 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1337 */
1338 ret = consumer_send_status_msg(sock, ret_code);
1339 if (ret < 0) {
1340 goto error_fatal;
1341 }
1342 ret = 1;
1343 end:
1344 health_code_update();
1345 rcu_read_unlock();
1346 return ret;
1347 }
1348
1349 /*
1350 * Populate index values of a kernel stream. Values are set in big endian order.
1351 *
1352 * Return 0 on success or else a negative value.
1353 */
1354 static int get_index_values(struct ctf_packet_index *index, int infd)
1355 {
1356 int ret;
1357 uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
1358 events_discarded, stream_id, stream_instance_id,
1359 packet_seq_num;
1360
1361 ret = kernctl_get_timestamp_begin(infd, &timestamp_begin);
1362 if (ret < 0) {
1363 PERROR("kernctl_get_timestamp_begin");
1364 goto error;
1365 }
1366
1367 ret = kernctl_get_timestamp_end(infd, &timestamp_end);
1368 if (ret < 0) {
1369 PERROR("kernctl_get_timestamp_end");
1370 goto error;
1371 }
1372
1373 ret = kernctl_get_events_discarded(infd, &events_discarded);
1374 if (ret < 0) {
1375 PERROR("kernctl_get_events_discarded");
1376 goto error;
1377 }
1378
1379 ret = kernctl_get_content_size(infd, &content_size);
1380 if (ret < 0) {
1381 PERROR("kernctl_get_content_size");
1382 goto error;
1383 }
1384
1385 ret = kernctl_get_packet_size(infd, &packet_size);
1386 if (ret < 0) {
1387 PERROR("kernctl_get_packet_size");
1388 goto error;
1389 }
1390
1391 ret = kernctl_get_stream_id(infd, &stream_id);
1392 if (ret < 0) {
1393 PERROR("kernctl_get_stream_id");
1394 goto error;
1395 }
1396
1397 ret = kernctl_get_instance_id(infd, &stream_instance_id);
1398 if (ret < 0) {
1399 if (ret == -ENOTTY) {
1400 /* Command not implemented by lttng-modules. */
1401 stream_instance_id = -1ULL;
1402 } else {
1403 PERROR("kernctl_get_instance_id");
1404 goto error;
1405 }
1406 }
1407
1408 ret = kernctl_get_sequence_number(infd, &packet_seq_num);
1409 if (ret < 0) {
1410 if (ret == -ENOTTY) {
1411 /* Command not implemented by lttng-modules. */
1412 packet_seq_num = -1ULL;
1413 ret = 0;
1414 } else {
1415 PERROR("kernctl_get_sequence_number");
1416 goto error;
1417 }
1418 }
1419 index->packet_seq_num = htobe64(index->packet_seq_num);
1420
1421 *index = (typeof(*index)) {
1422 .offset = index->offset,
1423 .packet_size = htobe64(packet_size),
1424 .content_size = htobe64(content_size),
1425 .timestamp_begin = htobe64(timestamp_begin),
1426 .timestamp_end = htobe64(timestamp_end),
1427 .events_discarded = htobe64(events_discarded),
1428 .stream_id = htobe64(stream_id),
1429 .stream_instance_id = htobe64(stream_instance_id),
1430 .packet_seq_num = htobe64(packet_seq_num),
1431 };
1432
1433 error:
1434 return ret;
1435 }
1436 /*
1437 * Sync metadata meaning request them to the session daemon and snapshot to the
1438 * metadata thread can consumer them.
1439 *
1440 * Metadata stream lock MUST be acquired.
1441 *
1442 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
1443 * is empty or a negative value on error.
1444 */
1445 int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
1446 {
1447 int ret;
1448
1449 assert(metadata);
1450
1451 ret = kernctl_buffer_flush(metadata->wait_fd);
1452 if (ret < 0) {
1453 ERR("Failed to flush kernel stream");
1454 goto end;
1455 }
1456
1457 ret = kernctl_snapshot(metadata->wait_fd);
1458 if (ret < 0) {
1459 if (ret != -EAGAIN) {
1460 ERR("Sync metadata, taking kernel snapshot failed.");
1461 goto end;
1462 }
1463 DBG("Sync metadata, no new kernel metadata");
1464 /* No new metadata, exit. */
1465 ret = ENODATA;
1466 goto end;
1467 }
1468
1469 end:
1470 return ret;
1471 }
1472
1473 static
1474 int update_stream_stats(struct lttng_consumer_stream *stream)
1475 {
1476 int ret;
1477 uint64_t seq, discarded;
1478
1479 ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
1480 if (ret < 0) {
1481 if (ret == -ENOTTY) {
1482 /* Command not implemented by lttng-modules. */
1483 seq = -1ULL;
1484 stream->sequence_number_unavailable = true;
1485 } else {
1486 PERROR("kernctl_get_sequence_number");
1487 goto end;
1488 }
1489 }
1490
1491 /*
1492 * Start the sequence when we extract the first packet in case we don't
1493 * start at 0 (for example if a consumer is not connected to the
1494 * session immediately after the beginning).
1495 */
1496 if (stream->last_sequence_number == -1ULL) {
1497 stream->last_sequence_number = seq;
1498 } else if (seq > stream->last_sequence_number) {
1499 stream->chan->lost_packets += seq -
1500 stream->last_sequence_number - 1;
1501 } else {
1502 /* seq <= last_sequence_number */
1503 ERR("Sequence number inconsistent : prev = %" PRIu64
1504 ", current = %" PRIu64,
1505 stream->last_sequence_number, seq);
1506 ret = -1;
1507 goto end;
1508 }
1509 stream->last_sequence_number = seq;
1510
1511 ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
1512 if (ret < 0) {
1513 PERROR("kernctl_get_events_discarded");
1514 goto end;
1515 }
1516 if (discarded < stream->last_discarded_events) {
1517 /*
1518 * Overflow has occurred. We assume only one wrap-around
1519 * has occurred.
1520 */
1521 stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
1522 stream->last_discarded_events + discarded;
1523 } else {
1524 stream->chan->discarded_events += discarded -
1525 stream->last_discarded_events;
1526 }
1527 stream->last_discarded_events = discarded;
1528 ret = 0;
1529
1530 end:
1531 return ret;
1532 }
1533
1534 /*
1535 * Check if the local version of the metadata stream matches with the version
1536 * of the metadata stream in the kernel. If it was updated, set the reset flag
1537 * on the stream.
1538 */
1539 static
1540 int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream)
1541 {
1542 int ret;
1543 uint64_t cur_version;
1544
1545 ret = kernctl_get_metadata_version(infd, &cur_version);
1546 if (ret < 0) {
1547 if (ret == -ENOTTY) {
1548 /*
1549 * LTTng-modules does not implement this
1550 * command.
1551 */
1552 ret = 0;
1553 goto end;
1554 }
1555 ERR("Failed to get the metadata version");
1556 goto end;
1557 }
1558
1559 if (stream->metadata_version == cur_version) {
1560 ret = 0;
1561 goto end;
1562 }
1563
1564 DBG("New metadata version detected");
1565 stream->metadata_version = cur_version;
1566 stream->reset_metadata_flag = 1;
1567 ret = 0;
1568
1569 end:
1570 return ret;
1571 }
1572
1573 /*
1574 * Consume data on a file descriptor and write it on a trace file.
1575 * The stream and channel locks must be held by the caller.
1576 */
1577 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1578 struct lttng_consumer_local_data *ctx)
1579 {
1580 unsigned long len, subbuf_size, padding;
1581 int err, write_index = 1, rotation_ret;
1582 ssize_t ret = 0;
1583 int infd = stream->wait_fd;
1584 struct ctf_packet_index index = {};
1585
1586 DBG("In read_subbuffer (infd : %d)", infd);
1587
1588 /*
1589 * If the stream was flagged to be ready for rotation before we extract the
1590 * next packet, rotate it now.
1591 */
1592 if (stream->rotate_ready) {
1593 DBG("Rotate stream before extracting data");
1594 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
1595 if (rotation_ret < 0) {
1596 ERR("Stream rotation error");
1597 ret = -1;
1598 goto error;
1599 }
1600 }
1601
1602 /* Get the next subbuffer */
1603 err = kernctl_get_next_subbuf(infd);
1604 if (err != 0) {
1605 /*
1606 * This is a debug message even for single-threaded consumer,
1607 * because poll() have more relaxed criterions than get subbuf,
1608 * so get_subbuf may fail for short race windows where poll()
1609 * would issue wakeups.
1610 */
1611 DBG("Reserving sub buffer failed (everything is normal, "
1612 "it is due to concurrency)");
1613 ret = err;
1614 goto error;
1615 }
1616
1617 /* Get the full subbuffer size including padding */
1618 err = kernctl_get_padded_subbuf_size(infd, &len);
1619 if (err != 0) {
1620 PERROR("Getting sub-buffer len failed.");
1621 err = kernctl_put_subbuf(infd);
1622 if (err != 0) {
1623 if (err == -EFAULT) {
1624 PERROR("Error in unreserving sub buffer\n");
1625 } else if (err == -EIO) {
1626 /* Should never happen with newer LTTng versions */
1627 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1628 }
1629 ret = err;
1630 goto error;
1631 }
1632 ret = err;
1633 goto error;
1634 }
1635
1636 if (!stream->metadata_flag) {
1637 ret = get_index_values(&index, infd);
1638 if (ret < 0) {
1639 err = kernctl_put_subbuf(infd);
1640 if (err != 0) {
1641 if (err == -EFAULT) {
1642 PERROR("Error in unreserving sub buffer\n");
1643 } else if (err == -EIO) {
1644 /* Should never happen with newer LTTng versions */
1645 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1646 }
1647 ret = err;
1648 goto error;
1649 }
1650 goto error;
1651 }
1652 ret = update_stream_stats(stream);
1653 if (ret < 0) {
1654 err = kernctl_put_subbuf(infd);
1655 if (err != 0) {
1656 if (err == -EFAULT) {
1657 PERROR("Error in unreserving sub buffer\n");
1658 } else if (err == -EIO) {
1659 /* Should never happen with newer LTTng versions */
1660 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1661 }
1662 ret = err;
1663 goto error;
1664 }
1665 goto error;
1666 }
1667 } else {
1668 write_index = 0;
1669 ret = metadata_stream_check_version(infd, stream);
1670 if (ret < 0) {
1671 err = kernctl_put_subbuf(infd);
1672 if (err != 0) {
1673 if (err == -EFAULT) {
1674 PERROR("Error in unreserving sub buffer\n");
1675 } else if (err == -EIO) {
1676 /* Should never happen with newer LTTng versions */
1677 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1678 }
1679 ret = err;
1680 goto error;
1681 }
1682 goto error;
1683 }
1684 }
1685
1686 switch (stream->chan->output) {
1687 case CONSUMER_CHANNEL_SPLICE:
1688 /*
1689 * XXX: The lttng-modules splice "actor" does not handle copying
1690 * partial pages hence only using the subbuffer size without the
1691 * padding makes the splice fail.
1692 */
1693 subbuf_size = len;
1694 padding = 0;
1695
1696 /* splice the subbuffer to the tracefile */
1697 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
1698 padding, &index);
1699 /*
1700 * XXX: Splice does not support network streaming so the return value
1701 * is simply checked against subbuf_size and not like the mmap() op.
1702 */
1703 if (ret != subbuf_size) {
1704 /*
1705 * display the error but continue processing to try
1706 * to release the subbuffer
1707 */
1708 ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
1709 ret, subbuf_size);
1710 write_index = 0;
1711 }
1712 break;
1713 case CONSUMER_CHANNEL_MMAP:
1714 {
1715 const char *subbuf_addr;
1716
1717 /* Get subbuffer size without padding */
1718 err = kernctl_get_subbuf_size(infd, &subbuf_size);
1719 if (err != 0) {
1720 PERROR("Getting sub-buffer len failed.");
1721 err = kernctl_put_subbuf(infd);
1722 if (err != 0) {
1723 if (err == -EFAULT) {
1724 PERROR("Error in unreserving sub buffer\n");
1725 } else if (err == -EIO) {
1726 /* Should never happen with newer LTTng versions */
1727 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1728 }
1729 ret = err;
1730 goto error;
1731 }
1732 ret = err;
1733 goto error;
1734 }
1735
1736 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1737 if (ret) {
1738 goto error_put_subbuf;
1739 }
1740
1741 /* Make sure the tracer is not gone mad on us! */
1742 assert(len >= subbuf_size);
1743
1744 padding = len - subbuf_size;
1745
1746 /* write the subbuffer to the tracefile */
1747 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
1748 subbuf_addr,
1749 subbuf_size,
1750 padding, &index);
1751 /*
1752 * The mmap operation should write subbuf_size amount of data when
1753 * network streaming or the full padding (len) size when we are _not_
1754 * streaming.
1755 */
1756 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
1757 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
1758 /*
1759 * Display the error but continue processing to try to release the
1760 * subbuffer. This is a DBG statement since this is possible to
1761 * happen without being a critical error.
1762 */
1763 DBG("Error writing to tracefile "
1764 "(ret: %zd != len: %lu != subbuf_size: %lu)",
1765 ret, len, subbuf_size);
1766 write_index = 0;
1767 }
1768 break;
1769 }
1770 default:
1771 ERR("Unknown output method");
1772 ret = -EPERM;
1773 }
1774 error_put_subbuf:
1775 err = kernctl_put_next_subbuf(infd);
1776 if (err != 0) {
1777 if (err == -EFAULT) {
1778 PERROR("Error in unreserving sub buffer\n");
1779 } else if (err == -EIO) {
1780 /* Should never happen with newer LTTng versions */
1781 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1782 }
1783 ret = err;
1784 goto error;
1785 }
1786
1787 /* Write index if needed. */
1788 if (!write_index) {
1789 goto rotate;
1790 }
1791
1792 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
1793 /*
1794 * In live, block until all the metadata is sent.
1795 */
1796 pthread_mutex_lock(&stream->metadata_timer_lock);
1797 assert(!stream->missed_metadata_flush);
1798 stream->waiting_on_metadata = true;
1799 pthread_mutex_unlock(&stream->metadata_timer_lock);
1800
1801 err = consumer_stream_sync_metadata(ctx, stream->session_id);
1802
1803 pthread_mutex_lock(&stream->metadata_timer_lock);
1804 stream->waiting_on_metadata = false;
1805 if (stream->missed_metadata_flush) {
1806 stream->missed_metadata_flush = false;
1807 pthread_mutex_unlock(&stream->metadata_timer_lock);
1808 (void) consumer_flush_kernel_index(stream);
1809 } else {
1810 pthread_mutex_unlock(&stream->metadata_timer_lock);
1811 }
1812 if (err < 0) {
1813 goto error;
1814 }
1815 }
1816
1817 err = consumer_stream_write_index(stream, &index);
1818 if (err < 0) {
1819 goto error;
1820 }
1821
1822 rotate:
1823 /*
1824 * After extracting the packet, we check if the stream is now ready to be
1825 * rotated and perform the action immediately.
1826 */
1827 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
1828 if (rotation_ret == 1) {
1829 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
1830 if (rotation_ret < 0) {
1831 ERR("Stream rotation error");
1832 ret = -1;
1833 goto error;
1834 }
1835 } else if (rotation_ret < 0) {
1836 ERR("Checking if stream is ready to rotate");
1837 ret = -1;
1838 goto error;
1839 }
1840
1841 error:
1842 return ret;
1843 }
1844
1845 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1846 {
1847 int ret;
1848
1849 assert(stream);
1850
1851 /*
1852 * Don't create anything if this is set for streaming or if there is
1853 * no current trace chunk on the parent channel.
1854 */
1855 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1856 stream->chan->trace_chunk) {
1857 ret = consumer_stream_create_output_files(stream, true);
1858 if (ret) {
1859 goto error;
1860 }
1861 }
1862
1863 if (stream->output == LTTNG_EVENT_MMAP) {
1864 /* get the len of the mmap region */
1865 unsigned long mmap_len;
1866
1867 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1868 if (ret != 0) {
1869 PERROR("kernctl_get_mmap_len");
1870 goto error_close_fd;
1871 }
1872 stream->mmap_len = (size_t) mmap_len;
1873
1874 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1875 MAP_PRIVATE, stream->wait_fd, 0);
1876 if (stream->mmap_base == MAP_FAILED) {
1877 PERROR("Error mmaping");
1878 ret = -1;
1879 goto error_close_fd;
1880 }
1881 }
1882
1883 /* we return 0 to let the library handle the FD internally */
1884 return 0;
1885
1886 error_close_fd:
1887 if (stream->out_fd >= 0) {
1888 int err;
1889
1890 err = close(stream->out_fd);
1891 assert(!err);
1892 stream->out_fd = -1;
1893 }
1894 error:
1895 return ret;
1896 }
1897
1898 /*
1899 * Check if data is still being extracted from the buffers for a specific
1900 * stream. Consumer data lock MUST be acquired before calling this function
1901 * and the stream lock.
1902 *
1903 * Return 1 if the traced data are still getting read else 0 meaning that the
1904 * data is available for trace viewer reading.
1905 */
1906 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
1907 {
1908 int ret;
1909
1910 assert(stream);
1911
1912 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1913 ret = 0;
1914 goto end;
1915 }
1916
1917 ret = kernctl_get_next_subbuf(stream->wait_fd);
1918 if (ret == 0) {
1919 /* There is still data so let's put back this subbuffer. */
1920 ret = kernctl_put_subbuf(stream->wait_fd);
1921 assert(ret == 0);
1922 ret = 1; /* Data is pending */
1923 goto end;
1924 }
1925
1926 /* Data is NOT pending and ready to be read. */
1927 ret = 0;
1928
1929 end:
1930 return ret;
1931 }
This page took 0.110101 seconds and 4 git commands to generate.