X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=97b890bf25340ba885f65a5c394d74de187126dd;hb=6684bfa4a76f566ff024d4f2159f59f2e783d8c2;hp=2b55fd4637ead2b056fcafecc47ef5504d7c5df5;hpb=5a4b955ed97bb0802977e50f1aa409f0bb4729ec;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 2b55fd463..97b890bf2 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -49,7 +49,7 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( struct lttng_consumer_stream *stream, unsigned long len) { unsigned long mmap_offset; - long ret = 0; + long ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int outfd = stream->out_fd; @@ -59,29 +59,39 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( if (ret != 0) { errno = -ret; PERROR("ustctl_get_mmap_read_offset"); + written = ret; goto end; } while (len > 0) { ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - errno = -ret; + if (ret < 0) { + if (errno == EINTR) { + /* restart the interrupted system call */ + continue; + } else { + PERROR("Error in file write"); + if (written == 0) { + written = ret; + } + goto end; + } + } else if (ret > len) { PERROR("Error in file write"); + written += ret; goto end; + } else { + len -= ret; + mmap_offset += ret; } /* This won't block, but will start writeout asynchronously */ lttng_sync_file_range(outfd, stream->out_fd_offset, ret, SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += ret; + written += ret; } - lttng_consumer_sync_trace_file(stream, orig_offset); - - goto end; - end: - return ret; + return written; } /* @@ -157,18 +167,38 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { struct lttng_consumer_channel *new_channel; int fds[1]; - size_t nb_fd = 1; + char *path; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } - ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); - if (ret != sizeof(fds)) { + + path = lttcomm_recv_string(sock); + + if (!path) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - return ret; + return -1; + } + + DBG("consumer_add_channel received path %s", path); + + fds[0] = open(path, O_RDWR); + + if (fds[0] < 0) { + DBG("consumer_add_channel open error on path %s", path); + free(path); + return -1; } + if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) < 0) { + DBG("consumer_add_channel fcntl error"); + free(path); + return -1; + } + + free(path); + DBG("consumer_add_channel %d", msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, @@ -194,17 +224,59 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_STREAM: { struct lttng_consumer_stream *new_stream; - int fds[2]; - size_t nb_fd = 2; + int fds[2], i; + char *shm_path, *wait_pipe_path; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } - ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); - if (ret != sizeof(fds)) { + + shm_path = lttcomm_recv_string(sock); + + if (!shm_path) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return -1; + } + + wait_pipe_path = lttcomm_recv_string(sock); + + if (!wait_pipe_path) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - return ret; + free(shm_path); + return -1; + } + + DBG("consumer_add_stream received path %s", shm_path); + DBG("consumer_add_stream received path %s", wait_pipe_path); + + fds[0] = open(shm_path, O_RDWR); + + if (fds[0] < 0) { + DBG("consumer_add_stream open error on path %s", shm_path); + free(shm_path); + free(wait_pipe_path); + return -1; + } + + fds[1] = open(wait_pipe_path, O_RDONLY); + + if (fds[1] < 0) { + DBG("consumer_add_stream open error on path %s", wait_pipe_path); + PERROR("open"); + free(shm_path); + free(wait_pipe_path); + return -1; + } + + free(shm_path); + free(wait_pipe_path); + + for (i = 0; i < 2; ++i) { + if (fcntl(fds[i], F_SETFD, FD_CLOEXEC) < 0) { + DBG("consumer_add_stream fcntl error"); + return -1; + } } DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, @@ -272,7 +344,12 @@ end: ret = write(ctx->consumer_poll_pipe[1], "", 1); } while (ret == -1UL && errno == EINTR); end_nosignal: - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket shutdown + * during the recv() or send() call. + */ + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) @@ -384,7 +461,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer