Network streaming support
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/sessiond-comm/relayd.h>
35 #include <common/compat/fcntl.h>
36 #include <common/relayd/relayd.h>
37
38 #include "kernel-consumer.h"
39
40 extern struct lttng_consumer_global_data consumer_data;
41 extern int consumer_poll_timeout;
42 extern volatile int consumer_quit;
43
44 /*
45 * Mmap the ring buffer, read it and write the data to the tracefile.
46 *
47 * Returns the number of bytes written
48 */
49 ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
50 struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream, unsigned long len)
52 {
53 unsigned long mmap_offset;
54 ssize_t ret = 0, written = 0;
55 off_t orig_offset = stream->out_fd_offset;
56 int fd = stream->wait_fd;
57 /* Default is on the disk */
58 int outfd = stream->out_fd;
59 uint64_t metadata_id;
60 struct consumer_relayd_sock_pair *relayd = NULL;
61
62 /* Flag that the current stream if set for network streaming. */
63 if (stream->net_seq_idx != -1) {
64 relayd = consumer_find_relayd(stream->net_seq_idx);
65 if (relayd == NULL) {
66 goto end;
67 }
68 }
69
70 /* get the offset inside the fd to mmap */
71 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
72 if (ret != 0) {
73 errno = -ret;
74 perror("kernctl_get_mmap_read_offset");
75 written = ret;
76 goto end;
77 }
78
79 /* RCU lock for the relayd pointer */
80 rcu_read_lock();
81
82 /* Handle stream on the relayd if the output is on the network */
83 if (relayd) {
84 /*
85 * Lock the control socket for the complete duration of the function
86 * since from this point on we will use the socket.
87 */
88 if (stream->metadata_flag) {
89 /* Metadata requires the control socket. */
90 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
91 }
92
93 ret = consumer_handle_stream_before_relayd(stream, len);
94 if (ret >= 0) {
95 /* Use the returned socket. */
96 outfd = ret;
97
98 /* Write metadata stream id before payload */
99 if (stream->metadata_flag) {
100 metadata_id = htobe64(stream->relayd_stream_id);
101 do {
102 ret = write(outfd, (void *) &metadata_id,
103 sizeof(stream->relayd_stream_id));
104 if (ret < 0) {
105 PERROR("write metadata stream id");
106 written = ret;
107 goto end;
108 }
109 } while (errno == EINTR);
110 DBG("Metadata stream id %zu written before data",
111 stream->relayd_stream_id);
112 /*
113 * We do this so the return value can match the len passed as
114 * argument to this function.
115 */
116 written -= sizeof(stream->relayd_stream_id);
117 }
118 }
119 /* Else, use the default set before which is the filesystem. */
120 }
121
122 while (len > 0) {
123 ret = write(outfd, stream->mmap_base + mmap_offset, len);
124 if (ret < 0) {
125 if (errno == EINTR) {
126 /* restart the interrupted system call */
127 continue;
128 } else {
129 perror("Error in file write");
130 if (written == 0) {
131 written = ret;
132 }
133 goto end;
134 }
135 } else if (ret > len) {
136 perror("Error in file write");
137 written += ret;
138 goto end;
139 } else {
140 len -= ret;
141 mmap_offset += ret;
142 }
143
144 /* This call is useless on a socket so better save a syscall. */
145 if (!relayd) {
146 /* This won't block, but will start writeout asynchronously */
147 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
148 SYNC_FILE_RANGE_WRITE);
149 stream->out_fd_offset += ret;
150 }
151 written += ret;
152 }
153 lttng_consumer_sync_trace_file(stream, orig_offset);
154
155 end:
156 /* Unlock only if ctrl socket used */
157 if (relayd && stream->metadata_flag) {
158 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
159 }
160
161 rcu_read_unlock();
162
163 return written;
164 }
165
166 /*
167 * Splice the data from the ring buffer to the tracefile.
168 *
169 * Returns the number of bytes spliced.
170 */
171 ssize_t lttng_kconsumer_on_read_subbuffer_splice(
172 struct lttng_consumer_local_data *ctx,
173 struct lttng_consumer_stream *stream, unsigned long len)
174 {
175 ssize_t ret = 0, written = 0, ret_splice = 0;
176 loff_t offset = 0;
177 off_t orig_offset = stream->out_fd_offset;
178 int fd = stream->wait_fd;
179 /* Default is on the disk */
180 int outfd = stream->out_fd;
181 uint64_t metadata_id;
182 struct consumer_relayd_sock_pair *relayd = NULL;
183
184 /* Flag that the current stream if set for network streaming. */
185 if (stream->net_seq_idx != -1) {
186 relayd = consumer_find_relayd(stream->net_seq_idx);
187 if (relayd == NULL) {
188 goto end;
189 }
190 }
191
192 /* RCU lock for the relayd pointer */
193 rcu_read_lock();
194
195 /* Write metadata stream id before payload */
196 if (stream->metadata_flag && relayd) {
197 /*
198 * Lock the control socket for the complete duration of the function
199 * since from this point on we will use the socket.
200 */
201 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
202
203 do {
204 metadata_id = htobe64(stream->relayd_stream_id);
205 ret = write(ctx->consumer_thread_pipe[1],
206 (void *) &metadata_id,
207 sizeof(stream->relayd_stream_id));
208 if (ret < 0) {
209 PERROR("write metadata stream id");
210 written = ret;
211 goto end;
212 }
213 } while (errno == EINTR);
214 DBG("Metadata stream id %zu written before data",
215 stream->relayd_stream_id);
216 }
217
218 while (len > 0) {
219 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
220 (unsigned long)offset, len, fd);
221 ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
222 SPLICE_F_MOVE | SPLICE_F_MORE);
223 DBG("splice chan to pipe, ret %zd", ret_splice);
224 if (ret_splice < 0) {
225 perror("Error in relay splice");
226 if (written == 0) {
227 written = ret_splice;
228 }
229 ret = errno;
230 goto splice_error;
231 }
232
233 /* Handle stream on the relayd if the output is on the network */
234 if (relayd) {
235 if (stream->metadata_flag) {
236 /* Update counter to fit the spliced data */
237 ret_splice += sizeof(stream->relayd_stream_id);
238 len += sizeof(stream->relayd_stream_id);
239 /*
240 * We do this so the return value can match the len passed as
241 * argument to this function.
242 */
243 written -= sizeof(stream->relayd_stream_id);
244 }
245
246 ret = consumer_handle_stream_before_relayd(stream, ret_splice);
247 if (ret >= 0) {
248 /* Use the returned socket. */
249 outfd = ret;
250 } else {
251 if (outfd == -1) {
252 ERR("Remote relayd disconnected. Stopping");
253 goto end;
254 }
255 }
256 }
257
258 DBG3("Kernel consumer splice data in %d to out %d",
259 ctx->consumer_thread_pipe[0], outfd);
260 ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
261 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
262 DBG("splice pipe to file, ret %zd", ret_splice);
263 if (ret_splice < 0) {
264 perror("Error in file splice");
265 if (written == 0) {
266 written = ret_splice;
267 }
268 ret = errno;
269 goto splice_error;
270 }
271 if (ret_splice > len) {
272 errno = EINVAL;
273 PERROR("Wrote more data than requested %zd (len: %lu)",
274 ret_splice, len);
275 written += ret_splice;
276 ret = errno;
277 goto splice_error;
278 }
279 len -= ret_splice;
280
281 /* This call is useless on a socket so better save a syscall. */
282 if (!relayd) {
283 /* This won't block, but will start writeout asynchronously */
284 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
285 SYNC_FILE_RANGE_WRITE);
286 stream->out_fd_offset += ret_splice;
287 }
288 written += ret_splice;
289 }
290 lttng_consumer_sync_trace_file(stream, orig_offset);
291
292 ret = ret_splice;
293
294 goto end;
295
296 splice_error:
297 /* send the appropriate error description to sessiond */
298 switch (ret) {
299 case EBADF:
300 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
301 break;
302 case EINVAL:
303 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
304 break;
305 case ENOMEM:
306 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
307 break;
308 case ESPIPE:
309 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
310 break;
311 }
312
313 end:
314 if (relayd && stream->metadata_flag) {
315 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
316 }
317
318 rcu_read_unlock();
319
320 return written;
321 }
322
323 /*
324 * Take a snapshot for a specific fd
325 *
326 * Returns 0 on success, < 0 on error
327 */
328 int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
329 struct lttng_consumer_stream *stream)
330 {
331 int ret = 0;
332 int infd = stream->wait_fd;
333
334 ret = kernctl_snapshot(infd);
335 if (ret != 0) {
336 errno = -ret;
337 perror("Getting sub-buffer snapshot.");
338 }
339
340 return ret;
341 }
342
343 /*
344 * Get the produced position
345 *
346 * Returns 0 on success, < 0 on error
347 */
348 int lttng_kconsumer_get_produced_snapshot(
349 struct lttng_consumer_local_data *ctx,
350 struct lttng_consumer_stream *stream,
351 unsigned long *pos)
352 {
353 int ret;
354 int infd = stream->wait_fd;
355
356 ret = kernctl_snapshot_get_produced(infd, pos);
357 if (ret != 0) {
358 errno = -ret;
359 perror("kernctl_snapshot_get_produced");
360 }
361
362 return ret;
363 }
364
365 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
366 int sock, struct pollfd *consumer_sockpoll)
367 {
368 ssize_t ret;
369 struct lttcomm_consumer_msg msg;
370
371 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
372 if (ret != sizeof(msg)) {
373 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
374 return ret;
375 }
376 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
377 return -ENOENT;
378 }
379
380 switch (msg.cmd_type) {
381 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
382 {
383 int fd;
384 struct consumer_relayd_sock_pair *relayd;
385
386 DBG("Consumer adding relayd socket");
387
388 /* Get relayd reference if exists. */
389 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
390 if (relayd == NULL) {
391 /* Not found. Allocate one. */
392 relayd = consumer_allocate_relayd_sock_pair(
393 msg.u.relayd_sock.net_index);
394 if (relayd == NULL) {
395 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
396 goto end_nosignal;
397 }
398 }
399
400 /* Poll on consumer socket. */
401 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
402 return -EINTR;
403 }
404
405 /* Get relayd socket from session daemon */
406 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
407 if (ret != sizeof(fd)) {
408 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
409 goto end_nosignal;
410 }
411
412 /* Copy socket information and received FD */
413 switch (msg.u.relayd_sock.type) {
414 case LTTNG_STREAM_CONTROL:
415 /* Copy received lttcomm socket */
416 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
417
418 ret = lttcomm_create_sock(&relayd->control_sock);
419 if (ret < 0) {
420 goto end_nosignal;
421 }
422
423 /* Close the created socket fd which is useless */
424 close(relayd->control_sock.fd);
425
426 /* Assign new file descriptor */
427 relayd->control_sock.fd = fd;
428 break;
429 case LTTNG_STREAM_DATA:
430 /* Copy received lttcomm socket */
431 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
432 ret = lttcomm_create_sock(&relayd->data_sock);
433 if (ret < 0) {
434 goto end_nosignal;
435 }
436
437 /* Close the created socket fd which is useless */
438 close(relayd->data_sock.fd);
439
440 /* Assign new file descriptor */
441 relayd->data_sock.fd = fd;
442 break;
443 default:
444 ERR("Unknown relayd socket type");
445 goto end_nosignal;
446 }
447
448 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
449 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
450 relayd->net_seq_idx, fd);
451
452 /*
453 * Add relayd socket pair to consumer data hashtable. If object already
454 * exists or on error, the function gracefully returns.
455 */
456 consumer_add_relayd(relayd);
457
458 goto end_nosignal;
459 }
460 case LTTNG_CONSUMER_ADD_CHANNEL:
461 {
462 struct lttng_consumer_channel *new_channel;
463
464 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
465 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
466 -1, -1,
467 msg.u.channel.mmap_len,
468 msg.u.channel.max_sb_size);
469 if (new_channel == NULL) {
470 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
471 goto end_nosignal;
472 }
473 if (ctx->on_recv_channel != NULL) {
474 ret = ctx->on_recv_channel(new_channel);
475 if (ret == 0) {
476 consumer_add_channel(new_channel);
477 } else if (ret < 0) {
478 goto end_nosignal;
479 }
480 } else {
481 consumer_add_channel(new_channel);
482 }
483 goto end_nosignal;
484 }
485 case LTTNG_CONSUMER_ADD_STREAM:
486 {
487 int fd;
488 struct consumer_relayd_sock_pair *relayd = NULL;
489 struct lttng_consumer_stream *new_stream;
490
491 /* block */
492 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
493 return -EINTR;
494 }
495
496 /* Get stream file descriptor from socket */
497 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
498 if (ret != sizeof(fd)) {
499 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
500 return ret;
501 }
502
503 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
504 msg.u.stream.stream_key,
505 fd, fd,
506 msg.u.stream.state,
507 msg.u.stream.mmap_len,
508 msg.u.stream.output,
509 msg.u.stream.path_name,
510 msg.u.stream.uid,
511 msg.u.stream.gid,
512 msg.u.stream.net_index,
513 msg.u.stream.metadata_flag);
514 if (new_stream == NULL) {
515 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
516 goto end;
517 }
518
519 /* The stream is not metadata. Get relayd reference if exists. */
520 relayd = consumer_find_relayd(msg.u.stream.net_index);
521 if (relayd != NULL) {
522 /* Add stream on the relayd */
523 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
524 ret = relayd_add_stream(&relayd->control_sock,
525 msg.u.stream.name, msg.u.stream.path_name,
526 &new_stream->relayd_stream_id);
527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
528 if (ret < 0) {
529 goto end;
530 }
531 } else if (msg.u.stream.net_index != -1) {
532 ERR("Network sequence index %d unknown. Not adding stream.",
533 msg.u.stream.net_index);
534 free(new_stream);
535 goto end;
536 }
537
538 if (ctx->on_recv_stream != NULL) {
539 ret = ctx->on_recv_stream(new_stream);
540 if (ret == 0) {
541 consumer_add_stream(new_stream);
542 } else if (ret < 0) {
543 goto end;
544 }
545 } else {
546 consumer_add_stream(new_stream);
547 }
548
549 DBG("Kernel consumer_add_stream (%d)", fd);
550 break;
551 }
552 case LTTNG_CONSUMER_UPDATE_STREAM:
553 {
554 if (ctx->on_update_stream != NULL) {
555 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
556 if (ret == 0) {
557 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
558 } else if (ret < 0) {
559 goto end;
560 }
561 } else {
562 consumer_change_stream_state(msg.u.stream.stream_key,
563 msg.u.stream.state);
564 }
565 break;
566 }
567 default:
568 break;
569 }
570 end:
571 /*
572 * Wake-up the other end by writing a null byte in the pipe
573 * (non-blocking). Important note: Because writing into the
574 * pipe is non-blocking (and therefore we allow dropping wakeup
575 * data, as long as there is wakeup data present in the pipe
576 * buffer to wake up the other end), the other end should
577 * perform the following sequence for waiting:
578 * 1) empty the pipe (reads).
579 * 2) perform update operation.
580 * 3) wait on the pipe (poll).
581 */
582 do {
583 ret = write(ctx->consumer_poll_pipe[1], "", 1);
584 } while (ret == -1UL && errno == EINTR);
585 end_nosignal:
586 return 0;
587 }
588
589 /*
590 * Consume data on a file descriptor and write it on a trace file.
591 */
592 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
593 struct lttng_consumer_local_data *ctx)
594 {
595 unsigned long len;
596 int err;
597 ssize_t ret = 0;
598 int infd = stream->wait_fd;
599
600 DBG("In read_subbuffer (infd : %d)", infd);
601 /* Get the next subbuffer */
602 err = kernctl_get_next_subbuf(infd);
603 if (err != 0) {
604 /*
605 * This is a debug message even for single-threaded consumer,
606 * because poll() have more relaxed criterions than get subbuf,
607 * so get_subbuf may fail for short race windows where poll()
608 * would issue wakeups.
609 */
610 DBG("Reserving sub buffer failed (everything is normal, "
611 "it is due to concurrency)");
612 goto end;
613 }
614
615 switch (stream->output) {
616 case LTTNG_EVENT_SPLICE:
617 /* read the whole subbuffer */
618 err = kernctl_get_padded_subbuf_size(infd, &len);
619 if (err != 0) {
620 errno = -ret;
621 perror("Getting sub-buffer len failed.");
622 goto end;
623 }
624
625 /* splice the subbuffer to the tracefile */
626 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
627 if (ret != len) {
628 /*
629 * display the error but continue processing to try
630 * to release the subbuffer
631 */
632 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
633 ret, len);
634 }
635
636 break;
637 case LTTNG_EVENT_MMAP:
638 /* read the used subbuffer size */
639 err = kernctl_get_padded_subbuf_size(infd, &len);
640 if (err != 0) {
641 errno = -ret;
642 perror("Getting sub-buffer len failed.");
643 goto end;
644 }
645 /* write the subbuffer to the tracefile */
646 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
647 if (ret != len) {
648 /*
649 * display the error but continue processing to try
650 * to release the subbuffer
651 */
652 ERR("Error writing to tracefile");
653 }
654 break;
655 default:
656 ERR("Unknown output method");
657 ret = -1;
658 }
659
660 err = kernctl_put_next_subbuf(infd);
661 if (err != 0) {
662 errno = -ret;
663 if (errno == EFAULT) {
664 perror("Error in unreserving sub buffer\n");
665 } else if (errno == EIO) {
666 /* Should never happen with newer LTTng versions */
667 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
668 }
669 goto end;
670 }
671
672 end:
673 return ret;
674 }
675
676 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
677 {
678 int ret;
679
680 /* Opening the tracefile in write mode */
681 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
682 ret = run_as_open(stream->path_name,
683 O_WRONLY|O_CREAT|O_TRUNC,
684 S_IRWXU|S_IRWXG|S_IRWXO,
685 stream->uid, stream->gid);
686 if (ret < 0) {
687 ERR("Opening %s", stream->path_name);
688 perror("open");
689 goto error;
690 }
691 stream->out_fd = ret;
692 }
693
694 if (stream->output == LTTNG_EVENT_MMAP) {
695 /* get the len of the mmap region */
696 unsigned long mmap_len;
697
698 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
699 if (ret != 0) {
700 errno = -ret;
701 perror("kernctl_get_mmap_len");
702 goto error_close_fd;
703 }
704 stream->mmap_len = (size_t) mmap_len;
705
706 stream->mmap_base = mmap(NULL, stream->mmap_len,
707 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
708 if (stream->mmap_base == MAP_FAILED) {
709 perror("Error mmaping");
710 ret = -1;
711 goto error_close_fd;
712 }
713 }
714
715 /* we return 0 to let the library handle the FD internally */
716 return 0;
717
718 error_close_fd:
719 {
720 int err;
721
722 err = close(stream->out_fd);
723 assert(!err);
724 }
725 error:
726 return ret;
727 }
728
This page took 0.071471 seconds and 4 git commands to generate.