struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
- long ret = 0;
+ long ret = 0, written = 0;
off_t orig_offset = stream->out_fd_offset;
int outfd = stream->out_fd;
if (ret != 0) {
errno = -ret;
PERROR("ustctl_get_mmap_read_offset");
+ written = ret;
goto end;
}
while (len > 0) {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
- if (ret >= len) {
- len = 0;
- } else if (ret < 0) {
- errno = -ret;
+ if (ret < 0) {
+ if (errno == EINTR) {
+ /* restart the interrupted system call */
+ continue;
+ } else {
+ PERROR("Error in file write");
+ if (written == 0) {
+ written = ret;
+ }
+ goto end;
+ }
+ } else if (ret > len) {
PERROR("Error in file write");
+ written += ret;
goto end;
+ } else {
+ len -= ret;
+ mmap_offset += ret;
}
/* This won't block, but will start writeout asynchronously */
lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
+ written += ret;
}
-
lttng_consumer_sync_trace_file(stream, orig_offset);
-
- goto end;
-
end:
- return ret;
+ return written;
}
/*
{
struct lttng_consumer_channel *new_channel;
int fds[1];
- size_t nb_fd = 1;
+ char *path;
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
}
- ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
- if (ret != sizeof(fds)) {
+
+ path = lttcomm_recv_string(sock);
+
+ if (!path) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- return ret;
+ return -1;
+ }
+
+ DBG("consumer_add_channel received path %s", path);
+
+ fds[0] = open(path, O_RDWR);
+
+ if (fds[0] < 0) {
+ DBG("consumer_add_channel open error on path %s", path);
+ free(path);
+ return -1;
}
+ if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) < 0) {
+ DBG("consumer_add_channel fcntl error");
+ free(path);
+ return -1;
+ }
+
+ free(path);
+
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
case LTTNG_CONSUMER_ADD_STREAM:
{
struct lttng_consumer_stream *new_stream;
- int fds[2];
- size_t nb_fd = 2;
+ int fds[2], i;
+ char *shm_path, *wait_pipe_path;
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
}
- ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
- if (ret != sizeof(fds)) {
+
+ shm_path = lttcomm_recv_string(sock);
+
+ if (!shm_path) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- return ret;
+ return -1;
+ }
+
+ wait_pipe_path = lttcomm_recv_string(sock);
+
+ if (!wait_pipe_path) {
+ lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ free(shm_path);
+ return -1;
+ }
+
+ DBG("consumer_add_stream received path %s", shm_path);
+ DBG("consumer_add_stream received path %s", wait_pipe_path);
+
+ fds[0] = open(shm_path, O_RDWR);
+
+ if (fds[0] < 0) {
+ DBG("consumer_add_stream open error on path %s", shm_path);
+ free(shm_path);
+ free(wait_pipe_path);
+ return -1;
+ }
+
+ fds[1] = open(wait_pipe_path, O_RDONLY);
+
+ if (fds[1] < 0) {
+ DBG("consumer_add_stream open error on path %s", wait_pipe_path);
+ PERROR("open");
+ free(shm_path);
+ free(wait_pipe_path);
+ return -1;
+ }
+
+ free(shm_path);
+ free(wait_pipe_path);
+
+ for (i = 0; i < 2; ++i) {
+ if (fcntl(fds[i], F_SETFD, FD_CLOEXEC) < 0) {
+ DBG("consumer_add_stream fcntl error");
+ return -1;
+ }
}
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
break;
}
end:
- /* signal the poll thread */
- ret = write(ctx->consumer_poll_pipe[1], "4", 1);
- if (ret < 0) {
- PERROR("write consumer poll");
- }
+ /*
+ * Wake-up the other end by writing a null byte in the pipe
+ * (non-blocking). Important note: Because writing into the
+ * pipe is non-blocking (and therefore we allow dropping wakeup
+ * data, as long as there is wakeup data present in the pipe
+ * buffer to wake up the other end), the other end should
+ * perform the following sequence for waiting:
+ * 1) empty the pipe (reads).
+ * 2) perform update operation.
+ * 3) wait on the pipe (poll).
+ */
+ do {
+ ret = write(ctx->consumer_poll_pipe[1], "", 1);
+ } while (ret == -1UL && errno == EINTR);
end_nosignal:
- return 0;
+
+ /*
+ * Return 1 to indicate success since the 0 value can be a socket shutdown
+ * during the recv() or send() call.
+ */
+ return 1;
}
int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
assert(err == 0);
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
- if (ret < 0) {
+ if (ret != len) {
/*
* display the error but continue processing to try
* to release the subbuffer