health_code_update();
pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Protect against concurrent teardown of a stream.
+ */
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
if (!stream->quiescent) {
ustctl_flush_buffer(stream->ustream, 0);
stream->quiescent = true;
}
+next:
pthread_mutex_unlock(&stream->lock);
}
error:
produced_pos, nb_packets_per_stream,
stream->max_sb_size);
- while (consumed_pos < produced_pos) {
+ while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;