ret = -1;
goto end;
}
+
+ /*
+ * Set last_net_seq_num before the close flag. Required by data
+ * pending check.
+ */
pthread_mutex_lock(&stream->lock);
- stream->closed = true;
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+
+ /*
+ * This is one of the conditions which may trigger a stream close
+ * with the others being:
+ * 1) A close command is received for a stream
+ * 2) The control connection owning the stream is closed
+ * 3) We have received all of the stream's data _after_ a close
+ * request.
+ */
+ try_stream_close(stream);
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
viewer_stream_put(vstream);
}
}
- pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
uint64_t net_seq_num;
uint32_t data_size;
struct relay_session *session;
- bool new_stream = false;
+ bool new_stream = false, close_requested = false;
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
stream->prev_seq = net_seq_num;
end_stream_unlock:
+ close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
if (new_stream) {
pthread_mutex_lock(&session->lock);
uatomic_set(&session->new_streams, 1);