Fix: relayd metadata size
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
3bd1e081
MD
21#include <poll.h>
22#include <pthread.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/mman.h>
26#include <sys/socket.h>
27#include <sys/types.h>
28#include <unistd.h>
dbb5dfe6 29#include <sys/stat.h>
3bd1e081 30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
10a8a223 33#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 34#include <common/sessiond-comm/relayd.h>
dbb5dfe6 35#include <common/compat/fcntl.h>
00e2e675 36#include <common/relayd/relayd.h>
0857097f 37
10a8a223 38#include "kernel-consumer.h"
3bd1e081
MD
39
40extern struct lttng_consumer_global_data consumer_data;
41extern int consumer_poll_timeout;
42extern volatile int consumer_quit;
43
44/*
45 * Mmap the ring buffer, read it and write the data to the tracefile.
46 *
47 * Returns the number of bytes written
48 */
4078b776 49ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
3bd1e081
MD
50 struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream, unsigned long len)
52{
53 unsigned long mmap_offset;
47e81c02 54 ssize_t ret = 0, written = 0;
3bd1e081
MD
55 off_t orig_offset = stream->out_fd_offset;
56 int fd = stream->wait_fd;
00e2e675 57 /* Default is on the disk */
3bd1e081 58 int outfd = stream->out_fd;
00e2e675
DG
59 uint64_t metadata_id;
60 struct consumer_relayd_sock_pair *relayd = NULL;
61
b0b335c8
MD
62 /* RCU lock for the relayd pointer */
63 rcu_read_lock();
64
00e2e675
DG
65 /* Flag that the current stream if set for network streaming. */
66 if (stream->net_seq_idx != -1) {
67 relayd = consumer_find_relayd(stream->net_seq_idx);
68 if (relayd == NULL) {
69 goto end;
70 }
71 }
3bd1e081
MD
72
73 /* get the offset inside the fd to mmap */
74 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
75 if (ret != 0) {
87dc6a9c 76 errno = -ret;
3bd1e081 77 perror("kernctl_get_mmap_read_offset");
47e81c02 78 written = ret;
3bd1e081
MD
79 goto end;
80 }
81
00e2e675
DG
82 /* Handle stream on the relayd if the output is on the network */
83 if (relayd) {
f6416125
MD
84 unsigned long netlen = len;
85
00e2e675
DG
86 /*
87 * Lock the control socket for the complete duration of the function
88 * since from this point on we will use the socket.
89 */
90 if (stream->metadata_flag) {
91 /* Metadata requires the control socket. */
92 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
f6416125 93 netlen += sizeof(stream->relayd_stream_id);
00e2e675
DG
94 }
95
f6416125 96 ret = consumer_handle_stream_before_relayd(stream, netlen);
00e2e675
DG
97 if (ret >= 0) {
98 /* Use the returned socket. */
99 outfd = ret;
100
101 /* Write metadata stream id before payload */
102 if (stream->metadata_flag) {
103 metadata_id = htobe64(stream->relayd_stream_id);
104 do {
105 ret = write(outfd, (void *) &metadata_id,
106 sizeof(stream->relayd_stream_id));
6f94560a
MD
107 } while (ret < 0 && errno == EINTR);
108 if (ret < 0) {
109 PERROR("write metadata stream id");
110 written = ret;
111 goto end;
112 }
00e2e675
DG
113 DBG("Metadata stream id %zu written before data",
114 stream->relayd_stream_id);
115 /*
116 * We do this so the return value can match the len passed as
117 * argument to this function.
118 */
119 written -= sizeof(stream->relayd_stream_id);
120 }
121 }
122 /* Else, use the default set before which is the filesystem. */
123 }
124
3bd1e081 125 while (len > 0) {
6f94560a
MD
126 do {
127 ret = write(outfd, stream->mmap_base + mmap_offset, len);
128 } while (ret < 0 && errno == EINTR);
47e81c02 129 if (ret < 0) {
6f94560a
MD
130 perror("Error in file write");
131 if (written == 0) {
132 written = ret;
47e81c02 133 }
6f94560a 134 goto end;
47e81c02 135 } else if (ret > len) {
3bd1e081 136 perror("Error in file write");
47e81c02 137 written += ret;
3bd1e081 138 goto end;
47e81c02
MD
139 } else {
140 len -= ret;
141 mmap_offset += ret;
3bd1e081 142 }
00e2e675
DG
143
144 /* This call is useless on a socket so better save a syscall. */
145 if (!relayd) {
146 /* This won't block, but will start writeout asynchronously */
147 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
148 SYNC_FILE_RANGE_WRITE);
149 stream->out_fd_offset += ret;
150 }
47e81c02 151 written += ret;
3bd1e081 152 }
3bd1e081 153 lttng_consumer_sync_trace_file(stream, orig_offset);
00e2e675 154
3bd1e081 155end:
00e2e675
DG
156 /* Unlock only if ctrl socket used */
157 if (relayd && stream->metadata_flag) {
158 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
159 }
160
161 rcu_read_unlock();
162
47e81c02 163 return written;
3bd1e081
MD
164}
165
166/*
167 * Splice the data from the ring buffer to the tracefile.
168 *
169 * Returns the number of bytes spliced.
170 */
4078b776 171ssize_t lttng_kconsumer_on_read_subbuffer_splice(
3bd1e081
MD
172 struct lttng_consumer_local_data *ctx,
173 struct lttng_consumer_stream *stream, unsigned long len)
174{
00e2e675 175 ssize_t ret = 0, written = 0, ret_splice = 0;
3bd1e081
MD
176 loff_t offset = 0;
177 off_t orig_offset = stream->out_fd_offset;
178 int fd = stream->wait_fd;
00e2e675 179 /* Default is on the disk */
3bd1e081 180 int outfd = stream->out_fd;
00e2e675
DG
181 uint64_t metadata_id;
182 struct consumer_relayd_sock_pair *relayd = NULL;
183
b0b335c8
MD
184 /* RCU lock for the relayd pointer */
185 rcu_read_lock();
186
00e2e675
DG
187 /* Flag that the current stream if set for network streaming. */
188 if (stream->net_seq_idx != -1) {
189 relayd = consumer_find_relayd(stream->net_seq_idx);
190 if (relayd == NULL) {
191 goto end;
192 }
193 }
194
00e2e675
DG
195 /* Write metadata stream id before payload */
196 if (stream->metadata_flag && relayd) {
197 /*
198 * Lock the control socket for the complete duration of the function
199 * since from this point on we will use the socket.
200 */
201 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
202
6f94560a 203 metadata_id = htobe64(stream->relayd_stream_id);
00e2e675 204 do {
00e2e675
DG
205 ret = write(ctx->consumer_thread_pipe[1],
206 (void *) &metadata_id,
207 sizeof(stream->relayd_stream_id));
6f94560a
MD
208 } while (ret < 0 && errno == EINTR);
209 if (ret < 0) {
210 PERROR("write metadata stream id");
211 written = ret;
212 goto end;
213 }
00e2e675
DG
214 DBG("Metadata stream id %zu written before data",
215 stream->relayd_stream_id);
216 }
3bd1e081
MD
217
218 while (len > 0) {
00e2e675
DG
219 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
220 (unsigned long)offset, len, fd);
221 ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
3bd1e081 222 SPLICE_F_MOVE | SPLICE_F_MORE);
00e2e675
DG
223 DBG("splice chan to pipe, ret %zd", ret_splice);
224 if (ret_splice < 0) {
3bd1e081 225 perror("Error in relay splice");
47e81c02 226 if (written == 0) {
00e2e675 227 written = ret_splice;
47e81c02
MD
228 }
229 ret = errno;
3bd1e081
MD
230 goto splice_error;
231 }
232
00e2e675
DG
233 /* Handle stream on the relayd if the output is on the network */
234 if (relayd) {
235 if (stream->metadata_flag) {
236 /* Update counter to fit the spliced data */
237 ret_splice += sizeof(stream->relayd_stream_id);
238 len += sizeof(stream->relayd_stream_id);
239 /*
240 * We do this so the return value can match the len passed as
241 * argument to this function.
242 */
243 written -= sizeof(stream->relayd_stream_id);
244 }
245
246 ret = consumer_handle_stream_before_relayd(stream, ret_splice);
247 if (ret >= 0) {
248 /* Use the returned socket. */
249 outfd = ret;
250 } else {
251 if (outfd == -1) {
252 ERR("Remote relayd disconnected. Stopping");
253 goto end;
254 }
255 }
256 }
257
258 DBG3("Kernel consumer splice data in %d to out %d",
259 ctx->consumer_thread_pipe[0], outfd);
260 ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
261 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
262 DBG("splice pipe to file, ret %zd", ret_splice);
263 if (ret_splice < 0) {
3bd1e081 264 perror("Error in file splice");
47e81c02 265 if (written == 0) {
00e2e675 266 written = ret_splice;
47e81c02
MD
267 }
268 ret = errno;
269 goto splice_error;
270 }
00e2e675 271 if (ret_splice > len) {
47e81c02 272 errno = EINVAL;
00e2e675
DG
273 PERROR("Wrote more data than requested %zd (len: %lu)",
274 ret_splice, len);
275 written += ret_splice;
47e81c02 276 ret = errno;
3bd1e081
MD
277 goto splice_error;
278 }
00e2e675
DG
279 len -= ret_splice;
280
281 /* This call is useless on a socket so better save a syscall. */
282 if (!relayd) {
283 /* This won't block, but will start writeout asynchronously */
284 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
285 SYNC_FILE_RANGE_WRITE);
286 stream->out_fd_offset += ret_splice;
287 }
288 written += ret_splice;
3bd1e081
MD
289 }
290 lttng_consumer_sync_trace_file(stream, orig_offset);
291
00e2e675
DG
292 ret = ret_splice;
293
3bd1e081
MD
294 goto end;
295
296splice_error:
297 /* send the appropriate error description to sessiond */
47e81c02 298 switch (ret) {
3bd1e081
MD
299 case EBADF:
300 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
301 break;
302 case EINVAL:
303 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
304 break;
305 case ENOMEM:
306 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
307 break;
308 case ESPIPE:
309 lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
310 break;
311 }
312
313end:
00e2e675
DG
314 if (relayd && stream->metadata_flag) {
315 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
316 }
317
318 rcu_read_unlock();
319
47e81c02 320 return written;
3bd1e081
MD
321}
322
323/*
324 * Take a snapshot for a specific fd
325 *
326 * Returns 0 on success, < 0 on error
327 */
328int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
329 struct lttng_consumer_stream *stream)
330{
331 int ret = 0;
332 int infd = stream->wait_fd;
333
334 ret = kernctl_snapshot(infd);
335 if (ret != 0) {
87dc6a9c 336 errno = -ret;
3bd1e081
MD
337 perror("Getting sub-buffer snapshot.");
338 }
339
340 return ret;
341}
342
343/*
344 * Get the produced position
345 *
346 * Returns 0 on success, < 0 on error
347 */
348int lttng_kconsumer_get_produced_snapshot(
349 struct lttng_consumer_local_data *ctx,
350 struct lttng_consumer_stream *stream,
351 unsigned long *pos)
352{
353 int ret;
354 int infd = stream->wait_fd;
355
356 ret = kernctl_snapshot_get_produced(infd, pos);
357 if (ret != 0) {
87dc6a9c 358 errno = -ret;
3bd1e081
MD
359 perror("kernctl_snapshot_get_produced");
360 }
361
362 return ret;
363}
364
365int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
366 int sock, struct pollfd *consumer_sockpoll)
367{
368 ssize_t ret;
369 struct lttcomm_consumer_msg msg;
370
371 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
372 if (ret != sizeof(msg)) {
f2fc6720 373 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
3bd1e081
MD
374 return ret;
375 }
376 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
377 return -ENOENT;
378 }
379
b0b335c8
MD
380 /* relayd needs RCU read-side protection */
381 rcu_read_lock();
382
3bd1e081 383 switch (msg.cmd_type) {
00e2e675
DG
384 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
385 {
386 int fd;
387 struct consumer_relayd_sock_pair *relayd;
388
389 DBG("Consumer adding relayd socket");
390
391 /* Get relayd reference if exists. */
392 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
393 if (relayd == NULL) {
394 /* Not found. Allocate one. */
395 relayd = consumer_allocate_relayd_sock_pair(
396 msg.u.relayd_sock.net_index);
397 if (relayd == NULL) {
398 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
399 goto end_nosignal;
400 }
401 }
402
403 /* Poll on consumer socket. */
404 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
405 return -EINTR;
406 }
407
408 /* Get relayd socket from session daemon */
409 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
410 if (ret != sizeof(fd)) {
411 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
412 goto end_nosignal;
413 }
414
415 /* Copy socket information and received FD */
416 switch (msg.u.relayd_sock.type) {
417 case LTTNG_STREAM_CONTROL:
418 /* Copy received lttcomm socket */
419 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
420
421 ret = lttcomm_create_sock(&relayd->control_sock);
422 if (ret < 0) {
423 goto end_nosignal;
424 }
425
426 /* Close the created socket fd which is useless */
427 close(relayd->control_sock.fd);
428
429 /* Assign new file descriptor */
430 relayd->control_sock.fd = fd;
431 break;
432 case LTTNG_STREAM_DATA:
433 /* Copy received lttcomm socket */
434 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
435 ret = lttcomm_create_sock(&relayd->data_sock);
436 if (ret < 0) {
437 goto end_nosignal;
438 }
439
440 /* Close the created socket fd which is useless */
441 close(relayd->data_sock.fd);
442
443 /* Assign new file descriptor */
444 relayd->data_sock.fd = fd;
445 break;
446 default:
447 ERR("Unknown relayd socket type");
448 goto end_nosignal;
449 }
450
451 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
452 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
453 relayd->net_seq_idx, fd);
454
455 /*
456 * Add relayd socket pair to consumer data hashtable. If object already
457 * exists or on error, the function gracefully returns.
458 */
459 consumer_add_relayd(relayd);
460
461 goto end_nosignal;
462 }
3bd1e081
MD
463 case LTTNG_CONSUMER_ADD_CHANNEL:
464 {
465 struct lttng_consumer_channel *new_channel;
466
467 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
468 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
469 -1, -1,
470 msg.u.channel.mmap_len,
471 msg.u.channel.max_sb_size);
472 if (new_channel == NULL) {
473 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
474 goto end_nosignal;
475 }
476 if (ctx->on_recv_channel != NULL) {
477 ret = ctx->on_recv_channel(new_channel);
478 if (ret == 0) {
479 consumer_add_channel(new_channel);
480 } else if (ret < 0) {
481 goto end_nosignal;
482 }
483 } else {
484 consumer_add_channel(new_channel);
485 }
486 goto end_nosignal;
487 }
488 case LTTNG_CONSUMER_ADD_STREAM:
489 {
f2fc6720 490 int fd;
00e2e675
DG
491 struct consumer_relayd_sock_pair *relayd = NULL;
492 struct lttng_consumer_stream *new_stream;
3bd1e081
MD
493
494 /* block */
495 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
496 return -EINTR;
497 }
00e2e675
DG
498
499 /* Get stream file descriptor from socket */
f2fc6720
MD
500 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
501 if (ret != sizeof(fd)) {
3bd1e081
MD
502 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
503 return ret;
504 }
3bd1e081 505
3bd1e081
MD
506 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
507 msg.u.stream.stream_key,
f2fc6720 508 fd, fd,
3bd1e081
MD
509 msg.u.stream.state,
510 msg.u.stream.mmap_len,
511 msg.u.stream.output,
6df2e2c9
MD
512 msg.u.stream.path_name,
513 msg.u.stream.uid,
00e2e675
DG
514 msg.u.stream.gid,
515 msg.u.stream.net_index,
516 msg.u.stream.metadata_flag);
3bd1e081
MD
517 if (new_stream == NULL) {
518 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
519 goto end;
520 }
00e2e675
DG
521
522 /* The stream is not metadata. Get relayd reference if exists. */
523 relayd = consumer_find_relayd(msg.u.stream.net_index);
524 if (relayd != NULL) {
525 /* Add stream on the relayd */
526 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
527 ret = relayd_add_stream(&relayd->control_sock,
528 msg.u.stream.name, msg.u.stream.path_name,
529 &new_stream->relayd_stream_id);
530 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
531 if (ret < 0) {
532 goto end;
533 }
534 } else if (msg.u.stream.net_index != -1) {
535 ERR("Network sequence index %d unknown. Not adding stream.",
536 msg.u.stream.net_index);
537 free(new_stream);
538 goto end;
539 }
540
3bd1e081
MD
541 if (ctx->on_recv_stream != NULL) {
542 ret = ctx->on_recv_stream(new_stream);
543 if (ret == 0) {
544 consumer_add_stream(new_stream);
545 } else if (ret < 0) {
546 goto end;
547 }
548 } else {
549 consumer_add_stream(new_stream);
550 }
00e2e675
DG
551
552 DBG("Kernel consumer_add_stream (%d)", fd);
3bd1e081
MD
553 break;
554 }
555 case LTTNG_CONSUMER_UPDATE_STREAM:
556 {
557 if (ctx->on_update_stream != NULL) {
558 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
559 if (ret == 0) {
560 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
561 } else if (ret < 0) {
562 goto end;
563 }
564 } else {
565 consumer_change_stream_state(msg.u.stream.stream_key,
566 msg.u.stream.state);
567 }
568 break;
569 }
570 default:
571 break;
572 }
573end:
04fdd819
MD
574 /*
575 * Wake-up the other end by writing a null byte in the pipe
576 * (non-blocking). Important note: Because writing into the
577 * pipe is non-blocking (and therefore we allow dropping wakeup
578 * data, as long as there is wakeup data present in the pipe
579 * buffer to wake up the other end), the other end should
580 * perform the following sequence for waiting:
581 * 1) empty the pipe (reads).
582 * 2) perform update operation.
583 * 3) wait on the pipe (poll).
584 */
585 do {
586 ret = write(ctx->consumer_poll_pipe[1], "", 1);
6f94560a 587 } while (ret < 0 && errno == EINTR);
3bd1e081 588end_nosignal:
b0b335c8 589 rcu_read_unlock();
3bd1e081
MD
590 return 0;
591}
d41f73b7
MD
592
593/*
594 * Consume data on a file descriptor and write it on a trace file.
595 */
4078b776 596ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
597 struct lttng_consumer_local_data *ctx)
598{
599 unsigned long len;
600 int err;
4078b776 601 ssize_t ret = 0;
d41f73b7
MD
602 int infd = stream->wait_fd;
603
604 DBG("In read_subbuffer (infd : %d)", infd);
605 /* Get the next subbuffer */
606 err = kernctl_get_next_subbuf(infd);
607 if (err != 0) {
d41f73b7
MD
608 /*
609 * This is a debug message even for single-threaded consumer,
610 * because poll() have more relaxed criterions than get subbuf,
611 * so get_subbuf may fail for short race windows where poll()
612 * would issue wakeups.
613 */
614 DBG("Reserving sub buffer failed (everything is normal, "
615 "it is due to concurrency)");
616 goto end;
617 }
618
619 switch (stream->output) {
620 case LTTNG_EVENT_SPLICE:
621 /* read the whole subbuffer */
622 err = kernctl_get_padded_subbuf_size(infd, &len);
623 if (err != 0) {
87dc6a9c 624 errno = -ret;
d41f73b7
MD
625 perror("Getting sub-buffer len failed.");
626 goto end;
627 }
628
629 /* splice the subbuffer to the tracefile */
630 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
47e81c02 631 if (ret != len) {
d41f73b7
MD
632 /*
633 * display the error but continue processing to try
634 * to release the subbuffer
635 */
00e2e675
DG
636 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
637 ret, len);
d41f73b7 638 }
47e81c02 639
d41f73b7
MD
640 break;
641 case LTTNG_EVENT_MMAP:
642 /* read the used subbuffer size */
643 err = kernctl_get_padded_subbuf_size(infd, &len);
644 if (err != 0) {
87dc6a9c 645 errno = -ret;
d41f73b7
MD
646 perror("Getting sub-buffer len failed.");
647 goto end;
648 }
649 /* write the subbuffer to the tracefile */
650 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
47e81c02 651 if (ret != len) {
d41f73b7
MD
652 /*
653 * display the error but continue processing to try
654 * to release the subbuffer
655 */
656 ERR("Error writing to tracefile");
657 }
658 break;
659 default:
660 ERR("Unknown output method");
661 ret = -1;
662 }
663
664 err = kernctl_put_next_subbuf(infd);
665 if (err != 0) {
87dc6a9c 666 errno = -ret;
d41f73b7
MD
667 if (errno == EFAULT) {
668 perror("Error in unreserving sub buffer\n");
669 } else if (errno == EIO) {
670 /* Should never happen with newer LTTng versions */
671 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
672 }
673 goto end;
674 }
675
676end:
677 return ret;
678}
679
680int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
681{
682 int ret;
683
684 /* Opening the tracefile in write mode */
00e2e675 685 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
e11d277b 686 ret = run_as_open(stream->path_name,
60b6c79c
MD
687 O_WRONLY|O_CREAT|O_TRUNC,
688 S_IRWXU|S_IRWXG|S_IRWXO,
689 stream->uid, stream->gid);
d41f73b7
MD
690 if (ret < 0) {
691 ERR("Opening %s", stream->path_name);
692 perror("open");
693 goto error;
694 }
695 stream->out_fd = ret;
696 }
697
698 if (stream->output == LTTNG_EVENT_MMAP) {
699 /* get the len of the mmap region */
700 unsigned long mmap_len;
701
702 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
703 if (ret != 0) {
87dc6a9c 704 errno = -ret;
d41f73b7
MD
705 perror("kernctl_get_mmap_len");
706 goto error_close_fd;
707 }
708 stream->mmap_len = (size_t) mmap_len;
709
710 stream->mmap_base = mmap(NULL, stream->mmap_len,
711 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
712 if (stream->mmap_base == MAP_FAILED) {
713 perror("Error mmaping");
714 ret = -1;
715 goto error_close_fd;
716 }
717 }
718
719 /* we return 0 to let the library handle the FD internally */
720 return 0;
721
722error_close_fd:
723 {
724 int err;
725
726 err = close(stream->out_fd);
727 assert(!err);
728 }
729error:
730 return ret;
731}
732
This page took 0.062873 seconds and 4 git commands to generate.