From 7c5aef6226a4752f3a4e60cd0b52c741dced395e Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 12 Dec 2012 13:39:37 -0500 Subject: [PATCH] Remove MSG_WAITALL on every recvmsg() socket type In order to handle messages that are possibly larger than the socket buffer size set by wmem_max and rmem_max /proc files, ensure that the recv-side reads the data chunk-wise rather than hanging on a MSG_WAITALL. In addition to fixing this issue, chances are that it will also help fixing hangs detected due to UNIX socket buffers filling up. The MSG_WAITALL behavior in such situations might be unexpected. Acked-by: Mathieu Desnoyers Signed-off-by: David Goulet --- src/bin/lttng-relayd/main.c | 17 ++++++++--------- src/common/sessiond-comm/inet.c | 16 +++++++++++----- src/common/sessiond-comm/inet6.c | 18 ++++++++++++------ src/common/sessiond-comm/unix.c | 14 ++++++++++++-- 4 files changed, 43 insertions(+), 22 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index bd19a3237..2a7152b89 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -932,7 +932,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, /* FIXME : use data_size for something ? */ ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, - sizeof(struct lttcomm_relayd_add_stream), MSG_WAITALL); + sizeof(struct lttcomm_relayd_add_stream), 0); if (ret < sizeof(struct lttcomm_relayd_add_stream)) { ERR("Relay didn't receive valid add_stream struct size : %d", ret); ret = -1; @@ -1028,7 +1028,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, - sizeof(struct lttcomm_relayd_close_stream), MSG_WAITALL); + sizeof(struct lttcomm_relayd_close_stream), 0); if (ret < sizeof(struct lttcomm_relayd_close_stream)) { ERR("Relay didn't receive valid add_stream struct size : %d", ret); ret = -1; @@ -1232,8 +1232,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, - MSG_WAITALL); + ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0); if (ret < 0 || ret != data_size) { ret = -1; ERR("Relay didn't receive the whole metadata"); @@ -1300,7 +1299,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, session->version_check_done = 1; /* Get version from the other side. */ - ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL); + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < 0 || ret != sizeof(msg)) { ret = -1; ERR("Relay failed to receive the version values."); @@ -1358,7 +1357,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL); + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < sizeof(msg)) { ERR("Relay didn't receive valid data_pending struct size : %d", ret); ret = -1; @@ -1493,7 +1492,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) uint32_t data_size; ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, - sizeof(struct lttcomm_relayd_data_hdr), MSG_WAITALL); + sizeof(struct lttcomm_relayd_data_hdr), 0); if (ret <= 0) { ERR("Connections seems to be closed"); ret = -1; @@ -1527,7 +1526,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); - ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL); + ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0); if (ret <= 0) { ret = -1; goto end_unlock; @@ -1767,7 +1766,7 @@ void *relay_thread_worker(void *data) if (relay_connection->type == RELAY_CONTROL) { ret = relay_connection->sock->ops->recvmsg( relay_connection->sock, &recv_hdr, - sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL); + sizeof(struct lttcomm_relayd_hdr), 0); /* connection closed */ if (ret <= 0) { relay_cleanup_poll_connection(&events, pollfd); diff --git a/src/common/sessiond-comm/inet.c b/src/common/sessiond-comm/inet.c index f9ea17dbf..7aee72560 100644 --- a/src/common/sessiond-comm/inet.c +++ b/src/common/sessiond-comm/inet.c @@ -207,6 +207,7 @@ ssize_t lttcomm_recvmsg_inet_sock(struct lttcomm_sock *sock, void *buf, struct msghdr msg; struct iovec iov[1]; ssize_t ret = -1; + size_t len_last; memset(&msg, 0, sizeof(msg)); @@ -218,16 +219,21 @@ ssize_t lttcomm_recvmsg_inet_sock(struct lttcomm_sock *sock, void *buf, msg.msg_name = (struct sockaddr *) &sock->sockaddr.addr.sin; msg.msg_namelen = sizeof(sock->sockaddr.addr.sin); - if (flags == 0) { - flags = MSG_WAITALL; - } - do { + len_last = iov[0].iov_len; ret = recvmsg(sock->fd, &msg, flags); - } while (ret < 0 && errno == EINTR); + if (ret > 0) { + iov[0].iov_base += ret; + iov[0].iov_len -= ret; + assert(ret <= len_last); + } + } while ((ret > 0 && ret < len_last) || (ret < 0 && errno == EINTR)); if (ret < 0) { PERROR("recvmsg inet"); + } else if (ret > 0) { + ret = len; } + /* Else ret = 0 meaning an orderly shutdown. */ return ret; } diff --git a/src/common/sessiond-comm/inet6.c b/src/common/sessiond-comm/inet6.c index 024beace3..ddbb50ed4 100644 --- a/src/common/sessiond-comm/inet6.c +++ b/src/common/sessiond-comm/inet6.c @@ -207,6 +207,7 @@ ssize_t lttcomm_recvmsg_inet6_sock(struct lttcomm_sock *sock, void *buf, struct msghdr msg; struct iovec iov[1]; ssize_t ret = -1; + size_t len_last; memset(&msg, 0, sizeof(msg)); @@ -218,16 +219,21 @@ ssize_t lttcomm_recvmsg_inet6_sock(struct lttcomm_sock *sock, void *buf, msg.msg_name = (struct sockaddr *) &sock->sockaddr.addr.sin6; msg.msg_namelen = sizeof(sock->sockaddr.addr.sin6); - if (flags == 0) { - flags = MSG_WAITALL; - } - do { + len_last = iov[0].iov_len; ret = recvmsg(sock->fd, &msg, flags); - } while (ret < 0 && errno == EINTR); + if (ret > 0) { + iov[0].iov_base += ret; + iov[0].iov_len -= ret; + assert(ret <= len_last); + } + } while ((ret > 0 && ret < len_last) || (ret < 0 && errno == EINTR)); if (ret < 0) { - PERROR("recvmsg inet6"); + PERROR("recvmsg inet"); + } else if (ret > 0) { + ret = len; } + /* Else ret = 0 meaning an orderly shutdown. */ return ret; } diff --git a/src/common/sessiond-comm/unix.c b/src/common/sessiond-comm/unix.c index 2f79f3a28..bbf030f67 100644 --- a/src/common/sessiond-comm/unix.c +++ b/src/common/sessiond-comm/unix.c @@ -157,6 +157,7 @@ ssize_t lttcomm_recv_unix_sock(int sock, void *buf, size_t len) struct msghdr msg; struct iovec iov[1]; ssize_t ret = -1; + size_t len_last; memset(&msg, 0, sizeof(msg)); @@ -166,11 +167,20 @@ ssize_t lttcomm_recv_unix_sock(int sock, void *buf, size_t len) msg.msg_iovlen = 1; do { - ret = recvmsg(sock, &msg, MSG_WAITALL); - } while (ret < 0 && errno == EINTR); + len_last = iov[0].iov_len; + ret = recvmsg(sock, &msg, 0); + if (ret > 0) { + iov[0].iov_base += ret; + iov[0].iov_len -= ret; + assert(ret <= len_last); + } + } while ((ret > 0 && ret < len_last) || (ret < 0 && errno == EINTR)); if (ret < 0) { PERROR("recvmsg"); + } else if (ret > 0) { + ret = len; } + /* Else ret = 0 meaning an orderly shutdown. */ return ret; } -- 2.34.1