X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=1045bfb46101d8a6bb63a8c91157f3ffbf5199a2;hp=1af69ef93015c7e397c8028781dc3c97895c1790;hb=534d2592c0566f7c07cd9e9f9385781a65566e16;hpb=23f5f35da2bfb8e5162419bf84627714f920da2c diff --git a/src/common/consumer.c b/src/common/consumer.c index 1af69ef93..1045bfb46 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -782,6 +782,13 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); data_hdr.padding_size = htobe32(padding); + /* + * Note that net_seq_num below is assigned with the *current* value of + * next_net_seq_num and only after that the next_net_seq_num will be + * increment. This is why when issuing a command on the relayd using + * this next value, 1 should always be substracted in order to compare + * the last seen sequence number on the relayd side to the last sent. + */ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1099,7 +1106,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) do { ret = write(ctx->consumer_should_quit[1], "4", 1); } while (ret < 0 && errno == EINTR); - if (ret < 0) { + if (ret < 0 || ret != 1) { PERROR("write consumer quit"); } @@ -1317,8 +1324,22 @@ static int write_relayd_metadata_id(int fd, do { ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); + if (ret < 0 || ret != sizeof(hdr)) { + /* + * This error means that the fd's end is closed so ignore the perror + * not to clubber the error output since this can happen in a normal + * code path. + */ + if (errno != EPIPE) { + PERROR("write metadata stream id"); + } + DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno); + /* + * Set ret to a negative value because if ret != sizeof(hdr), we don't + * handle writting the missing part so report that as an error and + * don't lie to the caller. + */ + ret = -1; goto end; } DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", @@ -1435,7 +1456,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } while (ret < 0 && errno == EINTR); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { - PERROR("Error in file write"); + /* + * This is possible if the fd is closed on the other side (outfd) + * or any write problem. It can be verbose a bit for a normal + * execution if for instance the relayd is stopped abruptly. This + * can happen so set this to a DBG statement. + */ + DBG("Error in file write mmap"); if (written == 0) { written = ret; } @@ -2576,7 +2603,7 @@ void *consumer_thread_sessiond_poll(void *data) /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { + if (sock < 0) { WARN("On accept"); goto end; } @@ -2738,6 +2765,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + ret = -1; goto error; } relayd->sessiond_session_id = (uint64_t) sessiond_id; @@ -2787,11 +2815,17 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd->control_sock.fd = fd; /* - * Create a session on the relayd and store the returned id. No need to - * grab the socket lock since the relayd object is not yet visible. + * Create a session on the relayd and store the returned id. Lock the + * control socket mutex if the relayd was NOT created before. */ + if (!relayd_created) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } ret = relayd_create_session(&relayd->control_sock, &relayd->relayd_session_id); + if (!relayd_created) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } if (ret < 0) { goto error; } @@ -2800,6 +2834,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id)); if (!relayd_id_node) { PERROR("zmalloc relayd id node"); + ret = -1; goto error; } @@ -2834,6 +2869,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; default: ERR("Unknown relayd socket type (%d)", sock_type); + ret = -1; goto error; } @@ -3015,7 +3051,8 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id); } else { ret = relayd_data_pending(&relayd->control_sock, - stream->relayd_stream_id, stream->next_net_seq_num); + stream->relayd_stream_id, + stream->next_net_seq_num - 1); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) {