X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=a73b4852d574be48f25f2184e2ae793ac46e85d8;hb=4cec016f4a1cb76ec3d917c2d261c4081910a65a;hp=009621a6f7b1c4701a0857848346f29c66b8511a;hpb=beaad64cecee395058e37c8b33dc50af99d771a4;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 009621a6f..a73b4852d 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -254,7 +254,7 @@ int notify_thread_pipe(int wpipe) do { ret = write(wpipe, "!", 1); } while (ret < 0 && errno == EINTR); - if (ret < 0) { + if (ret < 0 || ret != 1) { PERROR("write poll pipe"); } @@ -669,7 +669,7 @@ void *relay_thread_dispatcher(void *data) sizeof(struct relay_command)); } while (ret < 0 && errno == EINTR); free(relay_cmd); - if (ret < 0) { + if (ret < 0 || ret != sizeof(struct relay_command)) { PERROR("write cmd pipe"); goto error; } @@ -987,7 +987,12 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, sizeof(struct lttcomm_relayd_add_stream), 0); if (ret < sizeof(struct lttcomm_relayd_add_stream)) { - ERR("Relay didn't receive valid add_stream struct size : %d", ret); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid add_stream struct size : %d", ret); + } ret = -1; goto end_no_session; } @@ -1083,7 +1088,12 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, sizeof(struct lttcomm_relayd_close_stream), 0); if (ret < sizeof(struct lttcomm_relayd_close_stream)) { - ERR("Relay didn't receive valid add_stream struct size : %d", ret); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid add_stream struct size : %d", ret); + } ret = -1; goto end_no_session; } @@ -1234,7 +1244,7 @@ static int write_padding_to_file(int fd, uint32_t size) do { ret = write(fd, zeros, size); } while (ret < 0 && errno == EINTR); - if (ret < 0) { + if (ret < 0 || ret != size) { PERROR("write padding to file"); } @@ -1289,8 +1299,13 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0); if (ret < 0 || ret != data_size) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive the whole metadata"); + } ret = -1; - ERR("Relay didn't receive the whole metadata"); goto end; } metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; @@ -1307,7 +1322,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, ret = write(metadata_stream->fd, metadata_struct->payload, payload_size); } while (ret < 0 && errno == EINTR); - if (ret < payload_size) { + if (ret < 0 || ret != payload_size) { ERR("Relay error writing metadata on file"); ret = -1; goto end_unlock; @@ -1344,8 +1359,13 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, /* Get version from the other side. */ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < 0 || ret != sizeof(msg)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the version values."); + } ret = -1; - ERR("Relay failed to receive the version values."); goto end; } @@ -1402,7 +1422,13 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < sizeof(msg)) { - ERR("Relay didn't receive valid data_pending struct size : %d", ret); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid data_pending struct size : %d", + ret); + } ret = -1; goto end_no_session; } @@ -1427,7 +1453,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - last_net_seq_num)) <= 0) { + if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -1460,19 +1486,56 @@ end_no_session: */ static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, struct lttng_ht *streams_ht) { int ret; + uint64_t stream_id; + struct relay_stream *stream; + struct lttng_ht_iter iter; + struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; DBG("Checking quiescent state on control socket"); + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid begin data_pending struct size: %d", + ret); + } + ret = -1; + goto end_no_session; + } + + stream_id = be64toh(msg.stream_id); + + rcu_read_lock(); + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + if (stream->stream_handle == stream_id) { + stream->data_pending_check_done = 1; + DBG("Relay quiescent control pending flag set to %" PRIu64, + stream_id); + break; + } + } + rcu_read_unlock(); + reply.ret_code = htobe32(LTTNG_OK); ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (ret < 0) { ERR("Relay data quiescent control ret code failed"); } +end_no_session: return ret; } @@ -1508,8 +1571,13 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < sizeof(msg)) { - ERR("Relay didn't receive valid begin data_pending struct size: %d", - ret); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid begin data_pending struct size: %d", + ret); + } ret = -1; goto end_no_session; } @@ -1578,8 +1646,13 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); if (ret < sizeof(msg)) { - ERR("Relay didn't receive valid end data_pending struct size: %d", - ret); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid end data_pending struct size: %d", + ret); + } ret = -1; goto end_no_session; } @@ -1643,7 +1716,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_data_pending(recv_hdr, cmd, streams_ht); break; case RELAYD_QUIESCENT_CONTROL: - ret = relay_quiescent_control(recv_hdr, cmd); + ret = relay_quiescent_control(recv_hdr, cmd, streams_ht); break; case RELAYD_BEGIN_DATA_PENDING: ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht); @@ -1679,7 +1752,12 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); if (ret <= 0) { - ERR("Connections seems to be closed"); + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Unable to receive data header on sock %d", cmd->sock->fd); + } ret = -1; goto end; } @@ -1715,6 +1793,10 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) data_size, stream_id, net_seq_num); ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0); if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } ret = -1; goto end_unlock; } @@ -1722,7 +1804,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); - if (ret < data_size) { + if (ret < 0 || ret != data_size) { ERR("Relay error writing data to file"); ret = -1; goto end_unlock; @@ -1786,7 +1868,9 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, PERROR("Relay command zmalloc"); goto error; } - ret = read(fd, relay_connection, sizeof(struct relay_command)); + do { + ret = read(fd, relay_connection, sizeof(struct relay_command)); + } while (ret < 0 && errno == EINTR); if (ret < 0 || ret < sizeof(struct relay_command)) { PERROR("read relay cmd pipe"); goto error_read;