{
struct lttng_consumer_channel *new_channel;
int fds[1];
- size_t nb_fd = 1;
+ char *path;
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
}
- ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
- if (ret != sizeof(fds)) {
+
+ path = lttcomm_recv_string(sock);
+
+ if (!path) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- return ret;
+ return -1;
+ }
+
+ DBG("consumer_add_channel received path %s", path);
+
+ fds[0] = open(path, O_RDWR);
+
+ if (fds[0] < 0) {
+ DBG("consumer_add_channel open error on path %s", path);
+ free(path);
+ return -1;
}
+ if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) < 0) {
+ DBG("consumer_add_channel fcntl error");
+ free(path);
+ return -1;
+ }
+
+ free(path);
+
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
case LTTNG_CONSUMER_ADD_STREAM:
{
struct lttng_consumer_stream *new_stream;
- int fds[2];
- size_t nb_fd = 2;
+ int fds[2], i;
+ char *shm_path, *wait_pipe_path;
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
}
- ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
- if (ret != sizeof(fds)) {
+
+ shm_path = lttcomm_recv_string(sock);
+
+ if (!shm_path) {
+ lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ return -1;
+ }
+
+ wait_pipe_path = lttcomm_recv_string(sock);
+
+ if (!wait_pipe_path) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- return ret;
+ free(shm_path);
+ return -1;
+ }
+
+ DBG("consumer_add_stream received path %s", shm_path);
+ DBG("consumer_add_stream received path %s", wait_pipe_path);
+
+ fds[0] = open(shm_path, O_RDWR);
+
+ if (fds[0] < 0) {
+ DBG("consumer_add_stream open error on path %s", shm_path);
+ free(shm_path);
+ free(wait_pipe_path);
+ return -1;
+ }
+
+ fds[1] = open(wait_pipe_path, O_RDONLY);
+
+ if (fds[1] < 0) {
+ DBG("consumer_add_stream open error on path %s", wait_pipe_path);
+ PERROR("open");
+ free(shm_path);
+ free(wait_pipe_path);
+ return -1;
+ }
+
+ free(shm_path);
+ free(wait_pipe_path);
+
+ for (i = 0; i < 2; ++i) {
+ if (fcntl(fds[i], F_SETFD, FD_CLOEXEC) < 0) {
+ DBG("consumer_add_stream fcntl error");
+ return -1;
+ }
}
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
}
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ ssize_t readlen;
+ char dummy;
+
+ DBG("In check_pipe (wait_fd: %d, stream key: %d)\n",
+ stream->wait_fd, stream->key);
+
+ /* We consume the 1 byte written into the wait_fd by UST */
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == EINTR);
+ if (readlen == -1) {
+ return -1; /* error */
+ }
+ DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
+ if (readlen == 0)
+ return 1; /* POLLHUP */
+ }
+ return 0; /* no error nor HUP */
+
+}
+
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ret = readlen;
goto end;
}
+ DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
}
buf = stream->buf;