Fix: relayd metadata size
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/sessiond-comm/relayd.h>
35 #include <common/compat/fcntl.h>
36 #include <common/relayd/relayd.h>
37
38 #include "kernel-consumer.h"
39
40 extern struct lttng_consumer_global_data consumer_data;
41 extern int consumer_poll_timeout;
42 extern volatile int consumer_quit;
43
44 /*
45 * Mmap the ring buffer, read it and write the data to the tracefile.
46 *
47 * Returns the number of bytes written
48 */
49 ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
50 struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream, unsigned long len)
52 {
53 unsigned long mmap_offset;
54 ssize_t ret = 0, written = 0;
55 off_t orig_offset = stream->out_fd_offset;
56 int fd = stream->wait_fd;
57 /* Default is on the disk */
58 int outfd = stream->out_fd;
59 uint64_t metadata_id;
60 struct consumer_relayd_sock_pair *relayd = NULL;
61
62 /* RCU lock for the relayd pointer */
63 rcu_read_lock();
64
65 /* Flag that the current stream if set for network streaming. */
66 if (stream->net_seq_idx != -1) {
67 relayd = consumer_find_relayd(stream->net_seq_idx);
68 if (relayd == NULL) {
69 goto end;
70 }
71 }
72
73 /* get the offset inside the fd to mmap */
74 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
75 if (ret != 0) {
76 errno = -ret;
77 perror("kernctl_get_mmap_read_offset");
78 written = ret;
79 goto end;
80 }
81
82 /* Handle stream on the relayd if the output is on the network */
83 if (relayd) {
84 unsigned long netlen = len;
85
86 /*
87 * Lock the control socket for the complete duration of the function
88 * since from this point on we will use the socket.
89 */
90 if (stream->metadata_flag) {
91 /* Metadata requires the control socket. */
92 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
93 netlen += sizeof(stream->relayd_stream_id);
94 }
95
96 ret = consumer_handle_stream_before_relayd(stream, netlen);
97 if (ret >= 0) {
98 /* Use the returned socket. */
99 outfd = ret;
100
101 /* Write metadata stream id before payload */
102 if (stream->metadata_flag) {
103 metadata_id = htobe64(stream->relayd_stream_id);
104 do {
105 ret = write(outfd, (void *) &metadata_id,
106 sizeof(stream->relayd_stream_id));
107 } while (ret < 0 && errno == EINTR);
108 if (ret < 0) {
109 PERROR("write metadata stream id");
110 written = ret;
111 goto end;
112 }
113 DBG("Metadata stream id %zu written before data",
114 stream->relayd_stream_id);
115 /*
116 * We do this so the return value can match the len passed as
117 * argument to this function.
118 */
119 written -= sizeof(stream->relayd_stream_id);
120 }
121 }
122 /* Else, use the default set before which is the filesystem. */
123 }
124
125 while (len > 0) {
126 do {
127 ret = write(outfd, stream->mmap_base + mmap_offset, len);
128 } while (ret < 0 && errno == EINTR);
129 if (ret < 0) {
130 perror("Error in file write");
131 if (written == 0) {
132 written = ret;
133 }
134 goto end;
135 } else if (ret > len) {
136 perror("Error in file write");
137 written += ret;
138 goto end;
139 } else {
140 len -= ret;
141 mmap_offset += ret;
142 }
143
144 /* This call is useless on a socket so better save a syscall. */
145 if (!relayd) {
146 /* This won't block, but will start writeout asynchronously */
147 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
148 SYNC_FILE_RANGE_WRITE);
149 stream->out_fd_offset += ret;
150 }
151 written += ret;
152 }
153 lttng_consumer_sync_trace_file(stream, orig_offset);
154
155 end:
156 /* Unlock only if ctrl socket used */
157 if (relayd && stream->metadata_flag) {
158 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
159 }
160
161 rcu_read_unlock();
162
163 return written;
164 }
165
166 /*
167 * Splice the data from the ring buffer to the tracefile.
168 *
169 * Returns the number of bytes spliced.
170 */
171 ssize_t lttng_kconsumer_on_read_subbuffer_splice(
172 struct lttng_consumer_local_data *ctx,
173 struct lttng_consumer_stream *stream, unsigned long len)
174 {
175 ssize_t ret = 0, written = 0, ret_splice = 0;
176 loff_t offset = 0;
177 off_t orig_offset = stream->out_fd_offset;
178 int fd = stream->wait_fd;
179 /* Default is on the disk */
180 int outfd = stream->out_fd;
181 uint64_t metadata_id;
182 struct consumer_relayd_sock_pair *relayd = NULL;
183
184 /* RCU lock for the relayd pointer */
185 rcu_read_lock();
186
187 /* Flag that the current stream if set for network streaming. */
188 if (stream->net_seq_idx != -1) {
189 relayd = consumer_find_relayd(stream->net_seq_idx);
190 if (relayd == NULL) {
191 goto end;
192 }
193 }
194
195 /* Write metadata stream id before payload */
196 if (stream->metadata_flag && relayd) {
197 /*
198 * Lock the control socket for the complete duration of the function
199 * since from this point on we will use the socket.
200 */
201 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
202
203 metadata_id = htobe64(stream->relayd_stream_id);
204 do {
205 ret = write(ctx->consumer_thread_pipe[1],
206 (void *) &metadata_id,
207 sizeof(stream->relayd_stream_id));
208 } while (ret < 0 && errno == EINTR);
209 if (ret < 0) {
210 PERROR("write metadata stream id");
211 written = ret;
212 goto end;
213 }
214 DBG("Metadata stream id %zu written before data",
215 stream->relayd_stream_id);
216 }
217
218 while (len > 0) {
219 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
220 (unsigned long)offset, len, fd);
221 ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
222 SPLICE_F_MOVE | SPLICE_F_MORE);
223 DBG("splice chan to pipe, ret %zd", ret_splice);
224 if (ret_splice < 0) {
225 perror("Error in relay splice");
226 if (written == 0) {
227 written = ret_splice;
228 }
229 ret = errno;
230 goto splice_error;
231 }
232
233 /* Handle stream on the relayd if the output is on the network */
234 if (relayd) {
235 if (stream->metadata_flag) {
236 /* Update counter to fit the spliced data */
237 ret_splice += sizeof(stream->relayd_stream_id);
238 len += sizeof(stream->relayd_stream_id);
239 /*
240 * We do this so the return value can match the len passed as
241 * argument to this function.
242 */
243 written -= sizeof(stream->relayd_stream_id);
244 }
245
246 ret = consumer_handle_stream_before_relayd(stream, ret_splice);
247 if (ret >= 0) {
248 /* Use the returned socket. */
249 outfd = ret;
250 } else {
251 if (outfd == -1) {
252 ERR("Remote relayd disconnected. Stopping");
253 goto end;
254 }
255 }
256 }
257
258 DBG3("Kernel consumer splice data in %d to out %d",
259 ctx->consumer_thread_pipe[0], outfd);
260 ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
261 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
262 DBG("splice pipe to file, ret %zd", ret_splice);
263 if (ret_splice < 0) {
264 perror("Error in file splice");
265 if (written == 0) {
266 written = ret_splice;
267 }
268 ret = errno;
269 goto splice_error;
270 }
271 if (ret_splice > len) {
272 errno = EINVAL;
273 PERROR("Wrote more data than requested %zd (len: %lu)",
274 ret_splice, len);
275 written += ret_splice;
276 ret = errno;
277 goto splice_error;
278 }
279 len -= ret_splice;
280
281 /* This call is useless on a socket so better save a syscall. */
282 if (!relayd) {
283 /* This won't block, but will start writeout asynchronously */
284 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
285 SYNC_FILE_RANGE_WRITE);
286 stream->out_fd_offset += ret_splice;
287 }
288 written += ret_splice;
289 }
290 lttng_consumer_sync_trace_file(stream, orig_offset);
291
292 ret = ret_splice;
293
294 goto end;
295
296 splice_error:
297 /* send the appropriate error description to sessiond */
298 switch (ret) {
299 case EBADF:
300 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
301 break;
302 case EINVAL:
303 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
304 break;
305 case ENOMEM:
306 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
307 break;
308 case ESPIPE:
309 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
310 break;
311 }
312
313 end:
314 if (relayd && stream->metadata_flag) {
315 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
316 }
317
318 rcu_read_unlock();
319
320 return written;
321 }
322
323 /*
324 * Take a snapshot for a specific fd
325 *
326 * Returns 0 on success, < 0 on error
327 */
328 int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
329 struct lttng_consumer_stream *stream)
330 {
331 int ret = 0;
332 int infd = stream->wait_fd;
333
334 ret = kernctl_snapshot(infd);
335 if (ret != 0) {
336 errno = -ret;
337 perror("Getting sub-buffer snapshot.");
338 }
339
340 return ret;
341 }
342
343 /*
344 * Get the produced position
345 *
346 * Returns 0 on success, < 0 on error
347 */
348 int lttng_kconsumer_get_produced_snapshot(
349 struct lttng_consumer_local_data *ctx,
350 struct lttng_consumer_stream *stream,
351 unsigned long *pos)
352 {
353 int ret;
354 int infd = stream->wait_fd;
355
356 ret = kernctl_snapshot_get_produced(infd, pos);
357 if (ret != 0) {
358 errno = -ret;
359 perror("kernctl_snapshot_get_produced");
360 }
361
362 return ret;
363 }
364
365 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
366 int sock, struct pollfd *consumer_sockpoll)
367 {
368 ssize_t ret;
369 struct lttcomm_consumer_msg msg;
370
371 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
372 if (ret != sizeof(msg)) {
373 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
374 return ret;
375 }
376 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
377 return -ENOENT;
378 }
379
380 /* relayd needs RCU read-side protection */
381 rcu_read_lock();
382
383 switch (msg.cmd_type) {
384 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
385 {
386 int fd;
387 struct consumer_relayd_sock_pair *relayd;
388
389 DBG("Consumer adding relayd socket");
390
391 /* Get relayd reference if exists. */
392 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
393 if (relayd == NULL) {
394 /* Not found. Allocate one. */
395 relayd = consumer_allocate_relayd_sock_pair(
396 msg.u.relayd_sock.net_index);
397 if (relayd == NULL) {
398 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
399 goto end_nosignal;
400 }
401 }
402
403 /* Poll on consumer socket. */
404 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
405 return -EINTR;
406 }
407
408 /* Get relayd socket from session daemon */
409 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
410 if (ret != sizeof(fd)) {
411 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
412 goto end_nosignal;
413 }
414
415 /* Copy socket information and received FD */
416 switch (msg.u.relayd_sock.type) {
417 case LTTNG_STREAM_CONTROL:
418 /* Copy received lttcomm socket */
419 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
420
421 ret = lttcomm_create_sock(&relayd->control_sock);
422 if (ret < 0) {
423 goto end_nosignal;
424 }
425
426 /* Close the created socket fd which is useless */
427 close(relayd->control_sock.fd);
428
429 /* Assign new file descriptor */
430 relayd->control_sock.fd = fd;
431 break;
432 case LTTNG_STREAM_DATA:
433 /* Copy received lttcomm socket */
434 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
435 ret = lttcomm_create_sock(&relayd->data_sock);
436 if (ret < 0) {
437 goto end_nosignal;
438 }
439
440 /* Close the created socket fd which is useless */
441 close(relayd->data_sock.fd);
442
443 /* Assign new file descriptor */
444 relayd->data_sock.fd = fd;
445 break;
446 default:
447 ERR("Unknown relayd socket type");
448 goto end_nosignal;
449 }
450
451 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
452 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
453 relayd->net_seq_idx, fd);
454
455 /*
456 * Add relayd socket pair to consumer data hashtable. If object already
457 * exists or on error, the function gracefully returns.
458 */
459 consumer_add_relayd(relayd);
460
461 goto end_nosignal;
462 }
463 case LTTNG_CONSUMER_ADD_CHANNEL:
464 {
465 struct lttng_consumer_channel *new_channel;
466
467 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
468 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
469 -1, -1,
470 msg.u.channel.mmap_len,
471 msg.u.channel.max_sb_size);
472 if (new_channel == NULL) {
473 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
474 goto end_nosignal;
475 }
476 if (ctx->on_recv_channel != NULL) {
477 ret = ctx->on_recv_channel(new_channel);
478 if (ret == 0) {
479 consumer_add_channel(new_channel);
480 } else if (ret < 0) {
481 goto end_nosignal;
482 }
483 } else {
484 consumer_add_channel(new_channel);
485 }
486 goto end_nosignal;
487 }
488 case LTTNG_CONSUMER_ADD_STREAM:
489 {
490 int fd;
491 struct consumer_relayd_sock_pair *relayd = NULL;
492 struct lttng_consumer_stream *new_stream;
493
494 /* block */
495 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
496 return -EINTR;
497 }
498
499 /* Get stream file descriptor from socket */
500 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
501 if (ret != sizeof(fd)) {
502 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
503 return ret;
504 }
505
506 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
507 msg.u.stream.stream_key,
508 fd, fd,
509 msg.u.stream.state,
510 msg.u.stream.mmap_len,
511 msg.u.stream.output,
512 msg.u.stream.path_name,
513 msg.u.stream.uid,
514 msg.u.stream.gid,
515 msg.u.stream.net_index,
516 msg.u.stream.metadata_flag);
517 if (new_stream == NULL) {
518 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
519 goto end;
520 }
521
522 /* The stream is not metadata. Get relayd reference if exists. */
523 relayd = consumer_find_relayd(msg.u.stream.net_index);
524 if (relayd != NULL) {
525 /* Add stream on the relayd */
526 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
527 ret = relayd_add_stream(&relayd->control_sock,
528 msg.u.stream.name, msg.u.stream.path_name,
529 &new_stream->relayd_stream_id);
530 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
531 if (ret < 0) {
532 goto end;
533 }
534 } else if (msg.u.stream.net_index != -1) {
535 ERR("Network sequence index %d unknown. Not adding stream.",
536 msg.u.stream.net_index);
537 free(new_stream);
538 goto end;
539 }
540
541 if (ctx->on_recv_stream != NULL) {
542 ret = ctx->on_recv_stream(new_stream);
543 if (ret == 0) {
544 consumer_add_stream(new_stream);
545 } else if (ret < 0) {
546 goto end;
547 }
548 } else {
549 consumer_add_stream(new_stream);
550 }
551
552 DBG("Kernel consumer_add_stream (%d)", fd);
553 break;
554 }
555 case LTTNG_CONSUMER_UPDATE_STREAM:
556 {
557 if (ctx->on_update_stream != NULL) {
558 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
559 if (ret == 0) {
560 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
561 } else if (ret < 0) {
562 goto end;
563 }
564 } else {
565 consumer_change_stream_state(msg.u.stream.stream_key,
566 msg.u.stream.state);
567 }
568 break;
569 }
570 default:
571 break;
572 }
573 end:
574 /*
575 * Wake-up the other end by writing a null byte in the pipe
576 * (non-blocking). Important note: Because writing into the
577 * pipe is non-blocking (and therefore we allow dropping wakeup
578 * data, as long as there is wakeup data present in the pipe
579 * buffer to wake up the other end), the other end should
580 * perform the following sequence for waiting:
581 * 1) empty the pipe (reads).
582 * 2) perform update operation.
583 * 3) wait on the pipe (poll).
584 */
585 do {
586 ret = write(ctx->consumer_poll_pipe[1], "", 1);
587 } while (ret < 0 && errno == EINTR);
588 end_nosignal:
589 rcu_read_unlock();
590 return 0;
591 }
592
593 /*
594 * Consume data on a file descriptor and write it on a trace file.
595 */
596 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
597 struct lttng_consumer_local_data *ctx)
598 {
599 unsigned long len;
600 int err;
601 ssize_t ret = 0;
602 int infd = stream->wait_fd;
603
604 DBG("In read_subbuffer (infd : %d)", infd);
605 /* Get the next subbuffer */
606 err = kernctl_get_next_subbuf(infd);
607 if (err != 0) {
608 /*
609 * This is a debug message even for single-threaded consumer,
610 * because poll() have more relaxed criterions than get subbuf,
611 * so get_subbuf may fail for short race windows where poll()
612 * would issue wakeups.
613 */
614 DBG("Reserving sub buffer failed (everything is normal, "
615 "it is due to concurrency)");
616 goto end;
617 }
618
619 switch (stream->output) {
620 case LTTNG_EVENT_SPLICE:
621 /* read the whole subbuffer */
622 err = kernctl_get_padded_subbuf_size(infd, &len);
623 if (err != 0) {
624 errno = -ret;
625 perror("Getting sub-buffer len failed.");
626 goto end;
627 }
628
629 /* splice the subbuffer to the tracefile */
630 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
631 if (ret != len) {
632 /*
633 * display the error but continue processing to try
634 * to release the subbuffer
635 */
636 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
637 ret, len);
638 }
639
640 break;
641 case LTTNG_EVENT_MMAP:
642 /* read the used subbuffer size */
643 err = kernctl_get_padded_subbuf_size(infd, &len);
644 if (err != 0) {
645 errno = -ret;
646 perror("Getting sub-buffer len failed.");
647 goto end;
648 }
649 /* write the subbuffer to the tracefile */
650 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
651 if (ret != len) {
652 /*
653 * display the error but continue processing to try
654 * to release the subbuffer
655 */
656 ERR("Error writing to tracefile");
657 }
658 break;
659 default:
660 ERR("Unknown output method");
661 ret = -1;
662 }
663
664 err = kernctl_put_next_subbuf(infd);
665 if (err != 0) {
666 errno = -ret;
667 if (errno == EFAULT) {
668 perror("Error in unreserving sub buffer\n");
669 } else if (errno == EIO) {
670 /* Should never happen with newer LTTng versions */
671 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
672 }
673 goto end;
674 }
675
676 end:
677 return ret;
678 }
679
680 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
681 {
682 int ret;
683
684 /* Opening the tracefile in write mode */
685 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
686 ret = run_as_open(stream->path_name,
687 O_WRONLY|O_CREAT|O_TRUNC,
688 S_IRWXU|S_IRWXG|S_IRWXO,
689 stream->uid, stream->gid);
690 if (ret < 0) {
691 ERR("Opening %s", stream->path_name);
692 perror("open");
693 goto error;
694 }
695 stream->out_fd = ret;
696 }
697
698 if (stream->output == LTTNG_EVENT_MMAP) {
699 /* get the len of the mmap region */
700 unsigned long mmap_len;
701
702 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
703 if (ret != 0) {
704 errno = -ret;
705 perror("kernctl_get_mmap_len");
706 goto error_close_fd;
707 }
708 stream->mmap_len = (size_t) mmap_len;
709
710 stream->mmap_base = mmap(NULL, stream->mmap_len,
711 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
712 if (stream->mmap_base == MAP_FAILED) {
713 perror("Error mmaping");
714 ret = -1;
715 goto error_close_fd;
716 }
717 }
718
719 /* we return 0 to let the library handle the FD internally */
720 return 0;
721
722 error_close_fd:
723 {
724 int err;
725
726 err = close(stream->out_fd);
727 assert(!err);
728 }
729 error:
730 return ret;
731 }
732
This page took 0.044466 seconds and 4 git commands to generate.