551d8579a21ad4c6e86bed6904df6290c4304d24
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/sessiond-comm/relayd.h>
35 #include <common/compat/fcntl.h>
36 #include <common/relayd/relayd.h>
37
38 #include "kernel-consumer.h"
39
40 extern struct lttng_consumer_global_data consumer_data;
41 extern int consumer_poll_timeout;
42 extern volatile int consumer_quit;
43
44 /*
45 * Mmap the ring buffer, read it and write the data to the tracefile.
46 *
47 * Returns the number of bytes written
48 */
49 ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
50 struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream, unsigned long len)
52 {
53 unsigned long mmap_offset;
54 ssize_t ret = 0, written = 0;
55 off_t orig_offset = stream->out_fd_offset;
56 int fd = stream->wait_fd;
57 /* Default is on the disk */
58 int outfd = stream->out_fd;
59 uint64_t metadata_id;
60 struct consumer_relayd_sock_pair *relayd = NULL;
61
62 /* RCU lock for the relayd pointer */
63 rcu_read_lock();
64
65 /* Flag that the current stream if set for network streaming. */
66 if (stream->net_seq_idx != -1) {
67 relayd = consumer_find_relayd(stream->net_seq_idx);
68 if (relayd == NULL) {
69 goto end;
70 }
71 }
72
73 /* get the offset inside the fd to mmap */
74 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
75 if (ret != 0) {
76 errno = -ret;
77 perror("kernctl_get_mmap_read_offset");
78 written = ret;
79 goto end;
80 }
81
82 /* Handle stream on the relayd if the output is on the network */
83 if (relayd) {
84 /*
85 * Lock the control socket for the complete duration of the function
86 * since from this point on we will use the socket.
87 */
88 if (stream->metadata_flag) {
89 /* Metadata requires the control socket. */
90 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
91 }
92
93 ret = consumer_handle_stream_before_relayd(stream, len);
94 if (ret >= 0) {
95 /* Use the returned socket. */
96 outfd = ret;
97
98 /* Write metadata stream id before payload */
99 if (stream->metadata_flag) {
100 metadata_id = htobe64(stream->relayd_stream_id);
101 do {
102 ret = write(outfd, (void *) &metadata_id,
103 sizeof(stream->relayd_stream_id));
104 } while (ret < 0 && errno == EINTR);
105 if (ret < 0) {
106 PERROR("write metadata stream id");
107 written = ret;
108 goto end;
109 }
110 DBG("Metadata stream id %zu written before data",
111 stream->relayd_stream_id);
112 /*
113 * We do this so the return value can match the len passed as
114 * argument to this function.
115 */
116 written -= sizeof(stream->relayd_stream_id);
117 }
118 }
119 /* Else, use the default set before which is the filesystem. */
120 }
121
122 while (len > 0) {
123 do {
124 ret = write(outfd, stream->mmap_base + mmap_offset, len);
125 } while (ret < 0 && errno == EINTR);
126 if (ret < 0) {
127 perror("Error in file write");
128 if (written == 0) {
129 written = ret;
130 }
131 goto end;
132 } else if (ret > len) {
133 perror("Error in file write");
134 written += ret;
135 goto end;
136 } else {
137 len -= ret;
138 mmap_offset += ret;
139 }
140
141 /* This call is useless on a socket so better save a syscall. */
142 if (!relayd) {
143 /* This won't block, but will start writeout asynchronously */
144 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
145 SYNC_FILE_RANGE_WRITE);
146 stream->out_fd_offset += ret;
147 }
148 written += ret;
149 }
150 lttng_consumer_sync_trace_file(stream, orig_offset);
151
152 end:
153 /* Unlock only if ctrl socket used */
154 if (relayd && stream->metadata_flag) {
155 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
156 }
157
158 rcu_read_unlock();
159
160 return written;
161 }
162
163 /*
164 * Splice the data from the ring buffer to the tracefile.
165 *
166 * Returns the number of bytes spliced.
167 */
168 ssize_t lttng_kconsumer_on_read_subbuffer_splice(
169 struct lttng_consumer_local_data *ctx,
170 struct lttng_consumer_stream *stream, unsigned long len)
171 {
172 ssize_t ret = 0, written = 0, ret_splice = 0;
173 loff_t offset = 0;
174 off_t orig_offset = stream->out_fd_offset;
175 int fd = stream->wait_fd;
176 /* Default is on the disk */
177 int outfd = stream->out_fd;
178 uint64_t metadata_id;
179 struct consumer_relayd_sock_pair *relayd = NULL;
180
181 /* RCU lock for the relayd pointer */
182 rcu_read_lock();
183
184 /* Flag that the current stream if set for network streaming. */
185 if (stream->net_seq_idx != -1) {
186 relayd = consumer_find_relayd(stream->net_seq_idx);
187 if (relayd == NULL) {
188 goto end;
189 }
190 }
191
192 /* Write metadata stream id before payload */
193 if (stream->metadata_flag && relayd) {
194 /*
195 * Lock the control socket for the complete duration of the function
196 * since from this point on we will use the socket.
197 */
198 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
199
200 metadata_id = htobe64(stream->relayd_stream_id);
201 do {
202 ret = write(ctx->consumer_thread_pipe[1],
203 (void *) &metadata_id,
204 sizeof(stream->relayd_stream_id));
205 } while (ret < 0 && errno == EINTR);
206 if (ret < 0) {
207 PERROR("write metadata stream id");
208 written = ret;
209 goto end;
210 }
211 DBG("Metadata stream id %zu written before data",
212 stream->relayd_stream_id);
213 }
214
215 while (len > 0) {
216 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
217 (unsigned long)offset, len, fd);
218 ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
219 SPLICE_F_MOVE | SPLICE_F_MORE);
220 DBG("splice chan to pipe, ret %zd", ret_splice);
221 if (ret_splice < 0) {
222 perror("Error in relay splice");
223 if (written == 0) {
224 written = ret_splice;
225 }
226 ret = errno;
227 goto splice_error;
228 }
229
230 /* Handle stream on the relayd if the output is on the network */
231 if (relayd) {
232 if (stream->metadata_flag) {
233 /* Update counter to fit the spliced data */
234 ret_splice += sizeof(stream->relayd_stream_id);
235 len += sizeof(stream->relayd_stream_id);
236 /*
237 * We do this so the return value can match the len passed as
238 * argument to this function.
239 */
240 written -= sizeof(stream->relayd_stream_id);
241 }
242
243 ret = consumer_handle_stream_before_relayd(stream, ret_splice);
244 if (ret >= 0) {
245 /* Use the returned socket. */
246 outfd = ret;
247 } else {
248 if (outfd == -1) {
249 ERR("Remote relayd disconnected. Stopping");
250 goto end;
251 }
252 }
253 }
254
255 DBG3("Kernel consumer splice data in %d to out %d",
256 ctx->consumer_thread_pipe[0], outfd);
257 ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
258 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
259 DBG("splice pipe to file, ret %zd", ret_splice);
260 if (ret_splice < 0) {
261 perror("Error in file splice");
262 if (written == 0) {
263 written = ret_splice;
264 }
265 ret = errno;
266 goto splice_error;
267 }
268 if (ret_splice > len) {
269 errno = EINVAL;
270 PERROR("Wrote more data than requested %zd (len: %lu)",
271 ret_splice, len);
272 written += ret_splice;
273 ret = errno;
274 goto splice_error;
275 }
276 len -= ret_splice;
277
278 /* This call is useless on a socket so better save a syscall. */
279 if (!relayd) {
280 /* This won't block, but will start writeout asynchronously */
281 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
282 SYNC_FILE_RANGE_WRITE);
283 stream->out_fd_offset += ret_splice;
284 }
285 written += ret_splice;
286 }
287 lttng_consumer_sync_trace_file(stream, orig_offset);
288
289 ret = ret_splice;
290
291 goto end;
292
293 splice_error:
294 /* send the appropriate error description to sessiond */
295 switch (ret) {
296 case EBADF:
297 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
298 break;
299 case EINVAL:
300 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
301 break;
302 case ENOMEM:
303 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
304 break;
305 case ESPIPE:
306 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
307 break;
308 }
309
310 end:
311 if (relayd && stream->metadata_flag) {
312 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
313 }
314
315 rcu_read_unlock();
316
317 return written;
318 }
319
320 /*
321 * Take a snapshot for a specific fd
322 *
323 * Returns 0 on success, < 0 on error
324 */
325 int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
326 struct lttng_consumer_stream *stream)
327 {
328 int ret = 0;
329 int infd = stream->wait_fd;
330
331 ret = kernctl_snapshot(infd);
332 if (ret != 0) {
333 errno = -ret;
334 perror("Getting sub-buffer snapshot.");
335 }
336
337 return ret;
338 }
339
340 /*
341 * Get the produced position
342 *
343 * Returns 0 on success, < 0 on error
344 */
345 int lttng_kconsumer_get_produced_snapshot(
346 struct lttng_consumer_local_data *ctx,
347 struct lttng_consumer_stream *stream,
348 unsigned long *pos)
349 {
350 int ret;
351 int infd = stream->wait_fd;
352
353 ret = kernctl_snapshot_get_produced(infd, pos);
354 if (ret != 0) {
355 errno = -ret;
356 perror("kernctl_snapshot_get_produced");
357 }
358
359 return ret;
360 }
361
362 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
363 int sock, struct pollfd *consumer_sockpoll)
364 {
365 ssize_t ret;
366 struct lttcomm_consumer_msg msg;
367
368 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
369 if (ret != sizeof(msg)) {
370 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
371 return ret;
372 }
373 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
374 return -ENOENT;
375 }
376
377 /* relayd needs RCU read-side protection */
378 rcu_read_lock();
379
380 switch (msg.cmd_type) {
381 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
382 {
383 int fd;
384 struct consumer_relayd_sock_pair *relayd;
385
386 DBG("Consumer adding relayd socket");
387
388 /* Get relayd reference if exists. */
389 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
390 if (relayd == NULL) {
391 /* Not found. Allocate one. */
392 relayd = consumer_allocate_relayd_sock_pair(
393 msg.u.relayd_sock.net_index);
394 if (relayd == NULL) {
395 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
396 goto end_nosignal;
397 }
398 }
399
400 /* Poll on consumer socket. */
401 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
402 return -EINTR;
403 }
404
405 /* Get relayd socket from session daemon */
406 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
407 if (ret != sizeof(fd)) {
408 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
409 goto end_nosignal;
410 }
411
412 /* Copy socket information and received FD */
413 switch (msg.u.relayd_sock.type) {
414 case LTTNG_STREAM_CONTROL:
415 /* Copy received lttcomm socket */
416 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
417
418 ret = lttcomm_create_sock(&relayd->control_sock);
419 if (ret < 0) {
420 goto end_nosignal;
421 }
422
423 /* Close the created socket fd which is useless */
424 close(relayd->control_sock.fd);
425
426 /* Assign new file descriptor */
427 relayd->control_sock.fd = fd;
428 break;
429 case LTTNG_STREAM_DATA:
430 /* Copy received lttcomm socket */
431 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
432 ret = lttcomm_create_sock(&relayd->data_sock);
433 if (ret < 0) {
434 goto end_nosignal;
435 }
436
437 /* Close the created socket fd which is useless */
438 close(relayd->data_sock.fd);
439
440 /* Assign new file descriptor */
441 relayd->data_sock.fd = fd;
442 break;
443 default:
444 ERR("Unknown relayd socket type");
445 goto end_nosignal;
446 }
447
448 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
449 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
450 relayd->net_seq_idx, fd);
451
452 /*
453 * Add relayd socket pair to consumer data hashtable. If object already
454 * exists or on error, the function gracefully returns.
455 */
456 consumer_add_relayd(relayd);
457
458 goto end_nosignal;
459 }
460 case LTTNG_CONSUMER_ADD_CHANNEL:
461 {
462 struct lttng_consumer_channel *new_channel;
463
464 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
465 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
466 -1, -1,
467 msg.u.channel.mmap_len,
468 msg.u.channel.max_sb_size);
469 if (new_channel == NULL) {
470 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
471 goto end_nosignal;
472 }
473 if (ctx->on_recv_channel != NULL) {
474 ret = ctx->on_recv_channel(new_channel);
475 if (ret == 0) {
476 consumer_add_channel(new_channel);
477 } else if (ret < 0) {
478 goto end_nosignal;
479 }
480 } else {
481 consumer_add_channel(new_channel);
482 }
483 goto end_nosignal;
484 }
485 case LTTNG_CONSUMER_ADD_STREAM:
486 {
487 int fd;
488 struct consumer_relayd_sock_pair *relayd = NULL;
489 struct lttng_consumer_stream *new_stream;
490
491 /* block */
492 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
493 return -EINTR;
494 }
495
496 /* Get stream file descriptor from socket */
497 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
498 if (ret != sizeof(fd)) {
499 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
500 return ret;
501 }
502
503 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
504 msg.u.stream.stream_key,
505 fd, fd,
506 msg.u.stream.state,
507 msg.u.stream.mmap_len,
508 msg.u.stream.output,
509 msg.u.stream.path_name,
510 msg.u.stream.uid,
511 msg.u.stream.gid,
512 msg.u.stream.net_index,
513 msg.u.stream.metadata_flag);
514 if (new_stream == NULL) {
515 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
516 goto end;
517 }
518
519 /* The stream is not metadata. Get relayd reference if exists. */
520 relayd = consumer_find_relayd(msg.u.stream.net_index);
521 if (relayd != NULL) {
522 /* Add stream on the relayd */
523 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
524 ret = relayd_add_stream(&relayd->control_sock,
525 msg.u.stream.name, msg.u.stream.path_name,
526 &new_stream->relayd_stream_id);
527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
528 if (ret < 0) {
529 goto end;
530 }
531 } else if (msg.u.stream.net_index != -1) {
532 ERR("Network sequence index %d unknown. Not adding stream.",
533 msg.u.stream.net_index);
534 free(new_stream);
535 goto end;
536 }
537
538 if (ctx->on_recv_stream != NULL) {
539 ret = ctx->on_recv_stream(new_stream);
540 if (ret == 0) {
541 consumer_add_stream(new_stream);
542 } else if (ret < 0) {
543 goto end;
544 }
545 } else {
546 consumer_add_stream(new_stream);
547 }
548
549 DBG("Kernel consumer_add_stream (%d)", fd);
550 break;
551 }
552 case LTTNG_CONSUMER_UPDATE_STREAM:
553 {
554 if (ctx->on_update_stream != NULL) {
555 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
556 if (ret == 0) {
557 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
558 } else if (ret < 0) {
559 goto end;
560 }
561 } else {
562 consumer_change_stream_state(msg.u.stream.stream_key,
563 msg.u.stream.state);
564 }
565 break;
566 }
567 default:
568 break;
569 }
570 end:
571 /*
572 * Wake-up the other end by writing a null byte in the pipe
573 * (non-blocking). Important note: Because writing into the
574 * pipe is non-blocking (and therefore we allow dropping wakeup
575 * data, as long as there is wakeup data present in the pipe
576 * buffer to wake up the other end), the other end should
577 * perform the following sequence for waiting:
578 * 1) empty the pipe (reads).
579 * 2) perform update operation.
580 * 3) wait on the pipe (poll).
581 */
582 do {
583 ret = write(ctx->consumer_poll_pipe[1], "", 1);
584 } while (ret < 0 && errno == EINTR);
585 end_nosignal:
586 rcu_read_unlock();
587 return 0;
588 }
589
590 /*
591 * Consume data on a file descriptor and write it on a trace file.
592 */
593 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
594 struct lttng_consumer_local_data *ctx)
595 {
596 unsigned long len;
597 int err;
598 ssize_t ret = 0;
599 int infd = stream->wait_fd;
600
601 DBG("In read_subbuffer (infd : %d)", infd);
602 /* Get the next subbuffer */
603 err = kernctl_get_next_subbuf(infd);
604 if (err != 0) {
605 /*
606 * This is a debug message even for single-threaded consumer,
607 * because poll() have more relaxed criterions than get subbuf,
608 * so get_subbuf may fail for short race windows where poll()
609 * would issue wakeups.
610 */
611 DBG("Reserving sub buffer failed (everything is normal, "
612 "it is due to concurrency)");
613 goto end;
614 }
615
616 switch (stream->output) {
617 case LTTNG_EVENT_SPLICE:
618 /* read the whole subbuffer */
619 err = kernctl_get_padded_subbuf_size(infd, &len);
620 if (err != 0) {
621 errno = -ret;
622 perror("Getting sub-buffer len failed.");
623 goto end;
624 }
625
626 /* splice the subbuffer to the tracefile */
627 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
628 if (ret != len) {
629 /*
630 * display the error but continue processing to try
631 * to release the subbuffer
632 */
633 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
634 ret, len);
635 }
636
637 break;
638 case LTTNG_EVENT_MMAP:
639 /* read the used subbuffer size */
640 err = kernctl_get_padded_subbuf_size(infd, &len);
641 if (err != 0) {
642 errno = -ret;
643 perror("Getting sub-buffer len failed.");
644 goto end;
645 }
646 /* write the subbuffer to the tracefile */
647 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
648 if (ret != len) {
649 /*
650 * display the error but continue processing to try
651 * to release the subbuffer
652 */
653 ERR("Error writing to tracefile");
654 }
655 break;
656 default:
657 ERR("Unknown output method");
658 ret = -1;
659 }
660
661 err = kernctl_put_next_subbuf(infd);
662 if (err != 0) {
663 errno = -ret;
664 if (errno == EFAULT) {
665 perror("Error in unreserving sub buffer\n");
666 } else if (errno == EIO) {
667 /* Should never happen with newer LTTng versions */
668 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
669 }
670 goto end;
671 }
672
673 end:
674 return ret;
675 }
676
677 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
678 {
679 int ret;
680
681 /* Opening the tracefile in write mode */
682 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
683 ret = run_as_open(stream->path_name,
684 O_WRONLY|O_CREAT|O_TRUNC,
685 S_IRWXU|S_IRWXG|S_IRWXO,
686 stream->uid, stream->gid);
687 if (ret < 0) {
688 ERR("Opening %s", stream->path_name);
689 perror("open");
690 goto error;
691 }
692 stream->out_fd = ret;
693 }
694
695 if (stream->output == LTTNG_EVENT_MMAP) {
696 /* get the len of the mmap region */
697 unsigned long mmap_len;
698
699 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
700 if (ret != 0) {
701 errno = -ret;
702 perror("kernctl_get_mmap_len");
703 goto error_close_fd;
704 }
705 stream->mmap_len = (size_t) mmap_len;
706
707 stream->mmap_base = mmap(NULL, stream->mmap_len,
708 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
709 if (stream->mmap_base == MAP_FAILED) {
710 perror("Error mmaping");
711 ret = -1;
712 goto error_close_fd;
713 }
714 }
715
716 /* we return 0 to let the library handle the FD internally */
717 return 0;
718
719 error_close_fd:
720 {
721 int err;
722
723 err = close(stream->out_fd);
724 assert(!err);
725 }
726 error:
727 return ret;
728 }
729
This page took 0.043582 seconds and 3 git commands to generate.