fix: write EINTR handling
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/sessiond-comm/relayd.h>
35 #include <common/compat/fcntl.h>
36 #include <common/relayd/relayd.h>
37
38 #include "kernel-consumer.h"
39
40 extern struct lttng_consumer_global_data consumer_data;
41 extern int consumer_poll_timeout;
42 extern volatile int consumer_quit;
43
44 /*
45 * Mmap the ring buffer, read it and write the data to the tracefile.
46 *
47 * Returns the number of bytes written
48 */
49 ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
50 struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream, unsigned long len)
52 {
53 unsigned long mmap_offset;
54 ssize_t ret = 0, written = 0;
55 off_t orig_offset = stream->out_fd_offset;
56 int fd = stream->wait_fd;
57 /* Default is on the disk */
58 int outfd = stream->out_fd;
59 uint64_t metadata_id;
60 struct consumer_relayd_sock_pair *relayd = NULL;
61
62 /* Flag that the current stream if set for network streaming. */
63 if (stream->net_seq_idx != -1) {
64 relayd = consumer_find_relayd(stream->net_seq_idx);
65 if (relayd == NULL) {
66 goto end;
67 }
68 }
69
70 /* get the offset inside the fd to mmap */
71 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
72 if (ret != 0) {
73 errno = -ret;
74 perror("kernctl_get_mmap_read_offset");
75 written = ret;
76 goto end;
77 }
78
79 /* RCU lock for the relayd pointer */
80 rcu_read_lock();
81
82 /* Handle stream on the relayd if the output is on the network */
83 if (relayd) {
84 /*
85 * Lock the control socket for the complete duration of the function
86 * since from this point on we will use the socket.
87 */
88 if (stream->metadata_flag) {
89 /* Metadata requires the control socket. */
90 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
91 }
92
93 ret = consumer_handle_stream_before_relayd(stream, len);
94 if (ret >= 0) {
95 /* Use the returned socket. */
96 outfd = ret;
97
98 /* Write metadata stream id before payload */
99 if (stream->metadata_flag) {
100 metadata_id = htobe64(stream->relayd_stream_id);
101 do {
102 ret = write(outfd, (void *) &metadata_id,
103 sizeof(stream->relayd_stream_id));
104 } while (ret < 0 && errno == EINTR);
105 if (ret < 0) {
106 PERROR("write metadata stream id");
107 written = ret;
108 goto end;
109 }
110 DBG("Metadata stream id %zu written before data",
111 stream->relayd_stream_id);
112 /*
113 * We do this so the return value can match the len passed as
114 * argument to this function.
115 */
116 written -= sizeof(stream->relayd_stream_id);
117 }
118 }
119 /* Else, use the default set before which is the filesystem. */
120 }
121
122 while (len > 0) {
123 do {
124 ret = write(outfd, stream->mmap_base + mmap_offset, len);
125 } while (ret < 0 && errno == EINTR);
126 if (ret < 0) {
127 perror("Error in file write");
128 if (written == 0) {
129 written = ret;
130 }
131 goto end;
132 } else if (ret > len) {
133 perror("Error in file write");
134 written += ret;
135 goto end;
136 } else {
137 len -= ret;
138 mmap_offset += ret;
139 }
140
141 /* This call is useless on a socket so better save a syscall. */
142 if (!relayd) {
143 /* This won't block, but will start writeout asynchronously */
144 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
145 SYNC_FILE_RANGE_WRITE);
146 stream->out_fd_offset += ret;
147 }
148 written += ret;
149 }
150 lttng_consumer_sync_trace_file(stream, orig_offset);
151
152 end:
153 /* Unlock only if ctrl socket used */
154 if (relayd && stream->metadata_flag) {
155 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
156 }
157
158 rcu_read_unlock();
159
160 return written;
161 }
162
163 /*
164 * Splice the data from the ring buffer to the tracefile.
165 *
166 * Returns the number of bytes spliced.
167 */
168 ssize_t lttng_kconsumer_on_read_subbuffer_splice(
169 struct lttng_consumer_local_data *ctx,
170 struct lttng_consumer_stream *stream, unsigned long len)
171 {
172 ssize_t ret = 0, written = 0, ret_splice = 0;
173 loff_t offset = 0;
174 off_t orig_offset = stream->out_fd_offset;
175 int fd = stream->wait_fd;
176 /* Default is on the disk */
177 int outfd = stream->out_fd;
178 uint64_t metadata_id;
179 struct consumer_relayd_sock_pair *relayd = NULL;
180
181 /* Flag that the current stream if set for network streaming. */
182 if (stream->net_seq_idx != -1) {
183 relayd = consumer_find_relayd(stream->net_seq_idx);
184 if (relayd == NULL) {
185 goto end;
186 }
187 }
188
189 /* RCU lock for the relayd pointer */
190 rcu_read_lock();
191
192 /* Write metadata stream id before payload */
193 if (stream->metadata_flag && relayd) {
194 /*
195 * Lock the control socket for the complete duration of the function
196 * since from this point on we will use the socket.
197 */
198 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
199
200 metadata_id = htobe64(stream->relayd_stream_id);
201 do {
202 ret = write(ctx->consumer_thread_pipe[1],
203 (void *) &metadata_id,
204 sizeof(stream->relayd_stream_id));
205 } while (ret < 0 && errno == EINTR);
206 if (ret < 0) {
207 PERROR("write metadata stream id");
208 written = ret;
209 goto end;
210 }
211 DBG("Metadata stream id %zu written before data",
212 stream->relayd_stream_id);
213 }
214
215 while (len > 0) {
216 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
217 (unsigned long)offset, len, fd);
218 ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
219 SPLICE_F_MOVE | SPLICE_F_MORE);
220 DBG("splice chan to pipe, ret %zd", ret_splice);
221 if (ret_splice < 0) {
222 perror("Error in relay splice");
223 if (written == 0) {
224 written = ret_splice;
225 }
226 ret = errno;
227 goto splice_error;
228 }
229
230 /* Handle stream on the relayd if the output is on the network */
231 if (relayd) {
232 if (stream->metadata_flag) {
233 /* Update counter to fit the spliced data */
234 ret_splice += sizeof(stream->relayd_stream_id);
235 len += sizeof(stream->relayd_stream_id);
236 /*
237 * We do this so the return value can match the len passed as
238 * argument to this function.
239 */
240 written -= sizeof(stream->relayd_stream_id);
241 }
242
243 ret = consumer_handle_stream_before_relayd(stream, ret_splice);
244 if (ret >= 0) {
245 /* Use the returned socket. */
246 outfd = ret;
247 } else {
248 if (outfd == -1) {
249 ERR("Remote relayd disconnected. Stopping");
250 goto end;
251 }
252 }
253 }
254
255 DBG3("Kernel consumer splice data in %d to out %d",
256 ctx->consumer_thread_pipe[0], outfd);
257 ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
258 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
259 DBG("splice pipe to file, ret %zd", ret_splice);
260 if (ret_splice < 0) {
261 perror("Error in file splice");
262 if (written == 0) {
263 written = ret_splice;
264 }
265 ret = errno;
266 goto splice_error;
267 }
268 if (ret_splice > len) {
269 errno = EINVAL;
270 PERROR("Wrote more data than requested %zd (len: %lu)",
271 ret_splice, len);
272 written += ret_splice;
273 ret = errno;
274 goto splice_error;
275 }
276 len -= ret_splice;
277
278 /* This call is useless on a socket so better save a syscall. */
279 if (!relayd) {
280 /* This won't block, but will start writeout asynchronously */
281 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
282 SYNC_FILE_RANGE_WRITE);
283 stream->out_fd_offset += ret_splice;
284 }
285 written += ret_splice;
286 }
287 lttng_consumer_sync_trace_file(stream, orig_offset);
288
289 ret = ret_splice;
290
291 goto end;
292
293 splice_error:
294 /* send the appropriate error description to sessiond */
295 switch (ret) {
296 case EBADF:
297 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
298 break;
299 case EINVAL:
300 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
301 break;
302 case ENOMEM:
303 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
304 break;
305 case ESPIPE:
306 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
307 break;
308 }
309
310 end:
311 if (relayd && stream->metadata_flag) {
312 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
313 }
314
315 rcu_read_unlock();
316
317 return written;
318 }
319
320 /*
321 * Take a snapshot for a specific fd
322 *
323 * Returns 0 on success, < 0 on error
324 */
325 int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
326 struct lttng_consumer_stream *stream)
327 {
328 int ret = 0;
329 int infd = stream->wait_fd;
330
331 ret = kernctl_snapshot(infd);
332 if (ret != 0) {
333 errno = -ret;
334 perror("Getting sub-buffer snapshot.");
335 }
336
337 return ret;
338 }
339
340 /*
341 * Get the produced position
342 *
343 * Returns 0 on success, < 0 on error
344 */
345 int lttng_kconsumer_get_produced_snapshot(
346 struct lttng_consumer_local_data *ctx,
347 struct lttng_consumer_stream *stream,
348 unsigned long *pos)
349 {
350 int ret;
351 int infd = stream->wait_fd;
352
353 ret = kernctl_snapshot_get_produced(infd, pos);
354 if (ret != 0) {
355 errno = -ret;
356 perror("kernctl_snapshot_get_produced");
357 }
358
359 return ret;
360 }
361
362 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
363 int sock, struct pollfd *consumer_sockpoll)
364 {
365 ssize_t ret;
366 struct lttcomm_consumer_msg msg;
367
368 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
369 if (ret != sizeof(msg)) {
370 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
371 return ret;
372 }
373 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
374 return -ENOENT;
375 }
376
377 switch (msg.cmd_type) {
378 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
379 {
380 int fd;
381 struct consumer_relayd_sock_pair *relayd;
382
383 DBG("Consumer adding relayd socket");
384
385 /* Get relayd reference if exists. */
386 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
387 if (relayd == NULL) {
388 /* Not found. Allocate one. */
389 relayd = consumer_allocate_relayd_sock_pair(
390 msg.u.relayd_sock.net_index);
391 if (relayd == NULL) {
392 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
393 goto end_nosignal;
394 }
395 }
396
397 /* Poll on consumer socket. */
398 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
399 return -EINTR;
400 }
401
402 /* Get relayd socket from session daemon */
403 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
404 if (ret != sizeof(fd)) {
405 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
406 goto end_nosignal;
407 }
408
409 /* Copy socket information and received FD */
410 switch (msg.u.relayd_sock.type) {
411 case LTTNG_STREAM_CONTROL:
412 /* Copy received lttcomm socket */
413 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
414
415 ret = lttcomm_create_sock(&relayd->control_sock);
416 if (ret < 0) {
417 goto end_nosignal;
418 }
419
420 /* Close the created socket fd which is useless */
421 close(relayd->control_sock.fd);
422
423 /* Assign new file descriptor */
424 relayd->control_sock.fd = fd;
425 break;
426 case LTTNG_STREAM_DATA:
427 /* Copy received lttcomm socket */
428 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
429 ret = lttcomm_create_sock(&relayd->data_sock);
430 if (ret < 0) {
431 goto end_nosignal;
432 }
433
434 /* Close the created socket fd which is useless */
435 close(relayd->data_sock.fd);
436
437 /* Assign new file descriptor */
438 relayd->data_sock.fd = fd;
439 break;
440 default:
441 ERR("Unknown relayd socket type");
442 goto end_nosignal;
443 }
444
445 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
446 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
447 relayd->net_seq_idx, fd);
448
449 /*
450 * Add relayd socket pair to consumer data hashtable. If object already
451 * exists or on error, the function gracefully returns.
452 */
453 consumer_add_relayd(relayd);
454
455 goto end_nosignal;
456 }
457 case LTTNG_CONSUMER_ADD_CHANNEL:
458 {
459 struct lttng_consumer_channel *new_channel;
460
461 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
462 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
463 -1, -1,
464 msg.u.channel.mmap_len,
465 msg.u.channel.max_sb_size);
466 if (new_channel == NULL) {
467 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
468 goto end_nosignal;
469 }
470 if (ctx->on_recv_channel != NULL) {
471 ret = ctx->on_recv_channel(new_channel);
472 if (ret == 0) {
473 consumer_add_channel(new_channel);
474 } else if (ret < 0) {
475 goto end_nosignal;
476 }
477 } else {
478 consumer_add_channel(new_channel);
479 }
480 goto end_nosignal;
481 }
482 case LTTNG_CONSUMER_ADD_STREAM:
483 {
484 int fd;
485 struct consumer_relayd_sock_pair *relayd = NULL;
486 struct lttng_consumer_stream *new_stream;
487
488 /* block */
489 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
490 return -EINTR;
491 }
492
493 /* Get stream file descriptor from socket */
494 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
495 if (ret != sizeof(fd)) {
496 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
497 return ret;
498 }
499
500 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
501 msg.u.stream.stream_key,
502 fd, fd,
503 msg.u.stream.state,
504 msg.u.stream.mmap_len,
505 msg.u.stream.output,
506 msg.u.stream.path_name,
507 msg.u.stream.uid,
508 msg.u.stream.gid,
509 msg.u.stream.net_index,
510 msg.u.stream.metadata_flag);
511 if (new_stream == NULL) {
512 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
513 goto end;
514 }
515
516 /* The stream is not metadata. Get relayd reference if exists. */
517 relayd = consumer_find_relayd(msg.u.stream.net_index);
518 if (relayd != NULL) {
519 /* Add stream on the relayd */
520 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
521 ret = relayd_add_stream(&relayd->control_sock,
522 msg.u.stream.name, msg.u.stream.path_name,
523 &new_stream->relayd_stream_id);
524 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
525 if (ret < 0) {
526 goto end;
527 }
528 } else if (msg.u.stream.net_index != -1) {
529 ERR("Network sequence index %d unknown. Not adding stream.",
530 msg.u.stream.net_index);
531 free(new_stream);
532 goto end;
533 }
534
535 if (ctx->on_recv_stream != NULL) {
536 ret = ctx->on_recv_stream(new_stream);
537 if (ret == 0) {
538 consumer_add_stream(new_stream);
539 } else if (ret < 0) {
540 goto end;
541 }
542 } else {
543 consumer_add_stream(new_stream);
544 }
545
546 DBG("Kernel consumer_add_stream (%d)", fd);
547 break;
548 }
549 case LTTNG_CONSUMER_UPDATE_STREAM:
550 {
551 if (ctx->on_update_stream != NULL) {
552 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
553 if (ret == 0) {
554 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
555 } else if (ret < 0) {
556 goto end;
557 }
558 } else {
559 consumer_change_stream_state(msg.u.stream.stream_key,
560 msg.u.stream.state);
561 }
562 break;
563 }
564 default:
565 break;
566 }
567 end:
568 /*
569 * Wake-up the other end by writing a null byte in the pipe
570 * (non-blocking). Important note: Because writing into the
571 * pipe is non-blocking (and therefore we allow dropping wakeup
572 * data, as long as there is wakeup data present in the pipe
573 * buffer to wake up the other end), the other end should
574 * perform the following sequence for waiting:
575 * 1) empty the pipe (reads).
576 * 2) perform update operation.
577 * 3) wait on the pipe (poll).
578 */
579 do {
580 ret = write(ctx->consumer_poll_pipe[1], "", 1);
581 } while (ret < 0 && errno == EINTR);
582 end_nosignal:
583 return 0;
584 }
585
586 /*
587 * Consume data on a file descriptor and write it on a trace file.
588 */
589 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
590 struct lttng_consumer_local_data *ctx)
591 {
592 unsigned long len;
593 int err;
594 ssize_t ret = 0;
595 int infd = stream->wait_fd;
596
597 DBG("In read_subbuffer (infd : %d)", infd);
598 /* Get the next subbuffer */
599 err = kernctl_get_next_subbuf(infd);
600 if (err != 0) {
601 /*
602 * This is a debug message even for single-threaded consumer,
603 * because poll() have more relaxed criterions than get subbuf,
604 * so get_subbuf may fail for short race windows where poll()
605 * would issue wakeups.
606 */
607 DBG("Reserving sub buffer failed (everything is normal, "
608 "it is due to concurrency)");
609 goto end;
610 }
611
612 switch (stream->output) {
613 case LTTNG_EVENT_SPLICE:
614 /* read the whole subbuffer */
615 err = kernctl_get_padded_subbuf_size(infd, &len);
616 if (err != 0) {
617 errno = -ret;
618 perror("Getting sub-buffer len failed.");
619 goto end;
620 }
621
622 /* splice the subbuffer to the tracefile */
623 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
624 if (ret != len) {
625 /*
626 * display the error but continue processing to try
627 * to release the subbuffer
628 */
629 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
630 ret, len);
631 }
632
633 break;
634 case LTTNG_EVENT_MMAP:
635 /* read the used subbuffer size */
636 err = kernctl_get_padded_subbuf_size(infd, &len);
637 if (err != 0) {
638 errno = -ret;
639 perror("Getting sub-buffer len failed.");
640 goto end;
641 }
642 /* write the subbuffer to the tracefile */
643 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
644 if (ret != len) {
645 /*
646 * display the error but continue processing to try
647 * to release the subbuffer
648 */
649 ERR("Error writing to tracefile");
650 }
651 break;
652 default:
653 ERR("Unknown output method");
654 ret = -1;
655 }
656
657 err = kernctl_put_next_subbuf(infd);
658 if (err != 0) {
659 errno = -ret;
660 if (errno == EFAULT) {
661 perror("Error in unreserving sub buffer\n");
662 } else if (errno == EIO) {
663 /* Should never happen with newer LTTng versions */
664 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
665 }
666 goto end;
667 }
668
669 end:
670 return ret;
671 }
672
673 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
674 {
675 int ret;
676
677 /* Opening the tracefile in write mode */
678 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
679 ret = run_as_open(stream->path_name,
680 O_WRONLY|O_CREAT|O_TRUNC,
681 S_IRWXU|S_IRWXG|S_IRWXO,
682 stream->uid, stream->gid);
683 if (ret < 0) {
684 ERR("Opening %s", stream->path_name);
685 perror("open");
686 goto error;
687 }
688 stream->out_fd = ret;
689 }
690
691 if (stream->output == LTTNG_EVENT_MMAP) {
692 /* get the len of the mmap region */
693 unsigned long mmap_len;
694
695 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
696 if (ret != 0) {
697 errno = -ret;
698 perror("kernctl_get_mmap_len");
699 goto error_close_fd;
700 }
701 stream->mmap_len = (size_t) mmap_len;
702
703 stream->mmap_base = mmap(NULL, stream->mmap_len,
704 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
705 if (stream->mmap_base == MAP_FAILED) {
706 perror("Error mmaping");
707 ret = -1;
708 goto error_close_fd;
709 }
710 }
711
712 /* we return 0 to let the library handle the FD internally */
713 return 0;
714
715 error_close_fd:
716 {
717 int err;
718
719 err = close(stream->out_fd);
720 assert(!err);
721 }
722 error:
723 return ret;
724 }
725
This page took 0.043419 seconds and 4 git commands to generate.