//#define NUMPROCS 5 #define NUMPROCS 4 #define BUFSIZE 4 #define NR_SUBBUFS 2 #define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS) byte write_off = 0; byte read_off = 0; byte events_lost = 0; byte deliver = 0; // Number of subbuffers delivered byte refcount = 0; byte commit_count[NR_SUBBUFS]; byte buffer_use_count[BUFSIZE]; proctype switcher() { int prev_off, new_off, tmp_commit; int size; cmpxchg_loop: atomic { prev_off = write_off; size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE); new_off = prev_off + size; if :: new_off - read_off > BUFSIZE -> goto not_needed; :: else -> skip fi; } atomic { if :: prev_off != write_off -> goto cmpxchg_loop :: else -> write_off = new_off; fi; } atomic { tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size; commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit; if :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++ :: else -> skip fi; } not_needed: skip; } proctype tracer(byte size) { int prev_off, new_off, tmp_commit; int i; int j; cmpxchg_loop: atomic { prev_off = write_off; new_off = prev_off + size; } atomic { if :: new_off - read_off > BUFSIZE -> goto lost :: else -> skip fi; } atomic { if :: prev_off != write_off -> goto cmpxchg_loop :: else -> write_off = new_off; fi; i = 0; do :: i < size -> buffer_use_count[(prev_off + i) % BUFSIZE] = buffer_use_count[(prev_off + i) % BUFSIZE] + 1; i++ :: i >= size -> break od; } do :: j < BUFSIZE -> assert(buffer_use_count[j] < 2); j++ :: j >= BUFSIZE -> break od; // writing to buffer... atomic { i = 0; do :: i < size -> buffer_use_count[(prev_off + i) % BUFSIZE] = buffer_use_count[(prev_off + i) % BUFSIZE] - 1; i++ :: i >= size -> break od; tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size; commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit; if :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++ :: else -> skip fi; } goto end; lost: events_lost++; end: refcount = refcount - 1; } proctype reader() { int i; int j; //atomic { // do // :: (deliver == (NUMPROCS - events_lost) / SUBBUF_SIZE) -> break; // od; //} do :: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0 && commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE] % SUBBUF_SIZE == 0 -> atomic { i = 0; do :: i < SUBBUF_SIZE -> buffer_use_count[(read_off + i) % BUFSIZE] = buffer_use_count[(read_off + i) % BUFSIZE] + 1; i++ :: i >= SUBBUF_SIZE -> break od; } j = 0; do :: j < BUFSIZE -> assert(buffer_use_count[j] < 2); j++ :: j >= BUFSIZE -> break od; // reading from buffer... atomic { i = 0; do :: i < SUBBUF_SIZE -> buffer_use_count[(read_off + i) % BUFSIZE] = buffer_use_count[(read_off + i) % BUFSIZE] - 1; i++ :: i >= SUBBUF_SIZE -> break od; read_off = read_off + SUBBUF_SIZE; } :: read_off >= (NUMPROCS - events_lost) -> break; od; } proctype cleaner() { atomic { do :: refcount == 0 -> run switcher(); break; od; } } init { int i = 0; int j = 0; int sum = 0; int commit_sum = 0; atomic { i = 0; do :: i < NR_SUBBUFS -> commit_count[i] = 0; i++ :: i >= NR_SUBBUFS -> break od; i = 0; do :: i < BUFSIZE -> buffer_use_count[i] = 0; i++ :: i >= BUFSIZE -> break od; run reader(); run cleaner(); i = 0; do :: i < NUMPROCS -> refcount = refcount + 1; run tracer(1); i++ :: i >= NUMPROCS -> break od; run switcher(); } atomic { assert(write_off - read_off >= 0); j = 0; commit_sum = 0; do :: j < NR_SUBBUFS -> commit_sum = commit_sum + commit_count[j]; j++ :: j >= NR_SUBBUFS -> break od; assert(commit_sum <= write_off); j = 0; do :: j < BUFSIZE -> assert(buffer_use_count[j] < 2); j++ :: j >= BUFSIZE -> break od; //assert(events_lost == 0); } }