* this next value, 1 should always be substracted in order to compare
* the last seen sequence number on the relayd side to the last sent.
*/
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
*/
pthread_mutex_lock(&consumer_data.lock);
if (consumer_data.need_update) {
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ pollfd = NULL;
+
+ free(local_stream);
+ local_stream = NULL;
/* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
- size_t pipe_readlen;
+ ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
}
end:
DBG("polling thread exiting");
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ free(local_stream);
/*
* Close the write side of the pipe so epoll_wait() in