X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-kconsumer%2Flttng-kconsumer.c;h=e9861f20fb6b2e11103dacfc8bf36c7e0b77c527;hp=40745178134ab89b4db597ad1ed4ef45e8151dff;hb=0e636c80aae070dcb89850990053bf3711283340;hpb=147466ee4c90f00375828135def25bc7f25baaee diff --git a/liblttng-kconsumer/lttng-kconsumer.c b/liblttng-kconsumer/lttng-kconsumer.c index 407451781..e9861f20f 100644 --- a/liblttng-kconsumer/lttng-kconsumer.c +++ b/liblttng-kconsumer/lttng-kconsumer.c @@ -200,7 +200,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -236,26 +236,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_STREAM: { struct lttng_consumer_stream *new_stream; - int fds[2]; - size_t nb_fd = 1; + int fd; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } - ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); - if (ret != sizeof(fds)) { + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } - if (nb_fd < 2) - fds[1] = fds[0]; /* duplicate same fd if recv only one */ - DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, - fds[0], fds[1]); + DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name, + fd); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, - fds[0], fds[1], + fd, fd, msg.u.stream.state, msg.u.stream.mmap_len, msg.u.stream.output, @@ -303,3 +300,141 @@ end: end_nosignal: return 0; } + +/* + * Consume data on a file descriptor and write it on a trace file. + */ +int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + int infd = stream->wait_fd; + + DBG("In read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + switch (stream->output) { + case LTTNG_EVENT_SPLICE: + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + break; + case LTTNG_EVENT_MMAP: + /* read the used subbuffer size */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + break; + default: + ERR("Unknown output method"); + ret = -1; + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} + +int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (stream->path_name != NULL) { + ret = open(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", stream->path_name); + perror("open"); + goto error; + } + stream->out_fd = ret; + } + + if (stream->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + unsigned long mmap_len; + + ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto error_close_fd; + } + stream->mmap_len = (size_t) mmap_len; + + stream->mmap_base = mmap(NULL, stream->mmap_len, + PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); + if (stream->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto error_close_fd; + } + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error_close_fd: + { + int err; + + err = close(stream->out_fd); + assert(!err); + } +error: + return ret; +} +