//#define NUMPROCS 5 #define NUMPROCS 4 #define BUFSIZE 4 #define NR_SUBBUFS 2 #define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS) byte write_off = 0; byte read_off = 0; byte events_lost = 0; byte deliver = 0; // Number of subbuffers delivered byte refcount = 0; byte commit_count[NR_SUBBUFS]; proctype switcher() { int prev_off, new_off, tmp_commit; int size; cmpxchg_loop: atomic { prev_off = write_off; size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE); new_off = prev_off + size; if :: new_off - read_off > BUFSIZE -> goto not_needed; :: else -> skip fi; } atomic { if :: prev_off != write_off -> goto cmpxchg_loop :: else -> write_off = new_off; fi; } atomic { tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size; commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit; if :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++ :: else -> skip fi; } not_needed: skip; } proctype tracer(byte size) { int prev_off, new_off, tmp_commit; cmpxchg_loop: atomic { prev_off = write_off; new_off = prev_off + size; } atomic { if :: new_off - read_off > BUFSIZE -> goto lost :: else -> skip fi; } atomic { if :: prev_off != write_off -> goto cmpxchg_loop :: else -> write_off = new_off; fi; } atomic { tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size; commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit; if :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++ :: else -> skip fi; } goto end; lost: events_lost++; end: refcount = refcount - 1; } proctype reader() { //atomic { // do // :: (deliver == (NUMPROCS - events_lost) / SUBBUF_SIZE) -> break; // od; //} // made atomic to use less memory. Not really true. atomic { do :: write_off - read_off >= SUBBUF_SIZE && commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE] % SUBBUF_SIZE == 0 -> read_off = read_off + SUBBUF_SIZE; :: read_off >= (NUMPROCS - events_lost) -> break; od; } } proctype cleaner() { atomic { do :: refcount == 0 -> run switcher(); break; od; } } init { int i = 0; int j = 0; int sum = 0; int commit_sum = 0; atomic { i = 0; do :: i < NR_SUBBUFS -> commit_count[i] = 0; i++ :: i >= NR_SUBBUFS -> break od; run reader(); run cleaner(); i = 0; do :: i < NUMPROCS -> refcount = refcount + 1; run tracer(1); i++ :: i >= NUMPROCS -> break od; run switcher(); } atomic { assert(write_off - read_off >= 0); j = 0; commit_sum = 0; do :: j < NR_SUBBUFS -> commit_sum = commit_sum + commit_count[j]; j++ :: j >= NR_SUBBUFS -> break od; assert(commit_sum <= write_off); //assert(events_lost == 0); } }