summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
7a45c7e)
The data pending checks are only performed on the sequence number of
the received data. However, it is expected that the index of the
stream (when applicable) has been written to disk by the time this
check returns that no data is pending.
This patch ensures that the minimum between the data and index
sequence numbers are used to perform this check.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
return relay_index_set_data(index, &index_data);
}
return relay_index_set_data(index, &index_data);
}
+static bool session_streams_have_index(const struct relay_session *session)
+{
+ return session->minor >= 4 && !session->snapshot;
+}
+
/*
* Handle the RELAYD_CREATE_SESSION command.
*
/*
* Handle the RELAYD_CREATE_SESSION command.
*
struct relay_stream *stream;
ssize_t send_ret;
int ret;
struct relay_stream *stream;
ssize_t send_ret;
int ret;
DBG("Data pending command received");
DBG("Data pending command received");
pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&stream->lock);
- DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
- " and last_seq %" PRIu64, msg.stream_id,
- stream->prev_seq, msg.last_net_seq_num);
+ if (session_streams_have_index(session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_seq;
+ }
+ DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+ ", prev_index_seq %" PRIu64
+ ", and last_seq %" PRIu64, msg.stream_id,
+ stream->prev_seq, stream->prev_index_seq,
+ msg.last_net_seq_num);
/* Avoid wrapping issue */
/* Avoid wrapping issue */
- if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+ if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
/* Data has in fact been written and is NOT pending */
ret = 0;
} else {
/* Data has in fact been written and is NOT pending */
ret = 0;
} else {
}
pthread_mutex_lock(&stream->lock);
if (!stream->data_pending_check_done) {
}
pthread_mutex_lock(&stream->lock);
if (!stream->data_pending_check_done) {
- if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+ uint64_t stream_seq;
+
+ if (session_streams_have_index(conn->session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_seq;
+ }
+ if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
is_data_inflight = 1;
DBG("Data is still in flight for stream %" PRIu64,
stream->stream_handle);
is_data_inflight = 1;
DBG("Data is still in flight for stream %" PRIu64,
stream->stream_handle);
- if (session->minor >= 4 && !session->snapshot) {
+ if (session_streams_have_index(session)) {
ret = handle_index_data(stream, state->header.net_seq_num,
state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
if (ret < 0) {
ret = handle_index_data(stream, state->header.net_seq_num,
state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
if (ret < 0) {