#include <stdio.h>
#include <string.h>
+#include "ustd.h"
#include "localerr.h"
#include "ustcomm.h"
struct list_head buffers = LIST_HEAD_INIT(buffers);
-struct buffer_info {
- char *name;
- pid_t pid;
- struct ustcomm_connection conn;
-
- int shmid;
- int bufstruct_shmid;
-
- /* the buffer memory */
- void *mem;
- /* buffer size */
- int memlen;
- /* number of subbuffers in buffer */
- int n_subbufs;
- /* size of each subbuffer */
- int subbuf_size;
-
- /* the buffer information struct */
- void *bufstruct_mem;
-
- int file_fd; /* output file */
-
- struct list_head list;
-
- long consumed_old;
-};
-
/* return value: 0 = subbuffer is finished, it won't produce data anymore
* 1 = got subbuffer successfully
* <0 = error
return bufc-(const char *)buf;
}
-int get_subbuffer_died(struct buffer_info *buf)
-{
- return 0;
-}
-
-//int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
-//{
-// struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
-//
-////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
-// long consumed_old, consumed_idx, commit_count, write_offset;
-// consumed_old = atomic_long_read(<t_buf->consumed);
-// consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
-// commit_count = local_read(<t_buf->commit_count[consumed_idx]);
-// /*
-// * Make sure we read the commit count before reading the buffer
-// * data and the write offset. Correct consumed offset ordering
-// * wrt commit count is insured by the use of cmpxchg to update
-// * the consumed offset.
-// */
-// smp_rmb();
-// write_offset = local_read(<t_buf->offset);
-// /*
-// * Check that the subbuffer we are trying to consume has been
-// * already fully committed.
-// */
-// if (((commit_count - buf->chan->subbuf_size)
-// & ltt_channel->commit_count_mask)
-// - (BUFFER_TRUNC(consumed_old, buf->chan)
-// >> ltt_channel->n_subbufs_order)
-// != 0) {
-// return -EAGAIN;
-// }
-// /*
-// * Check that we are not about to read the same subbuffer in
-// * which the writer head is.
-// */
-// if ((SUBBUF_TRUNC(write_offset, buf->chan)
-// - SUBBUF_TRUNC(consumed_old, buf->chan))
-// == 0) {
-// return -EAGAIN;
-// }
-//
-// *pconsumed_old = consumed_old;
-// return 0;
-//}
-
void *consumer_thread(void *arg)
{
struct buffer_info *buf = (struct buffer_info *) arg;
int result;
- int died = 0;
for(;;) {
/* get the subbuffer */
- if(died == 0) {
- result = get_subbuffer(buf);
- if(result == -1) {
- ERR("error getting subbuffer");
- continue;
- }
- else if(result == GET_SUBBUF_DONE) {
- /* this is done */
- break;
- }
- else if(result == GET_SUBBUF_DIED) {
- died = 1;
- }
+ result = get_subbuffer(buf);
+ if(result == -1) {
+ ERR("error getting subbuffer");
+ continue;
}
- if(died == 1) {
- result = get_subbuffer_died(buf);
- if(result <= 0) {
- break;
- }
+ else if(result == GET_SUBBUF_DONE) {
+ /* this is done */
+ break;
+ }
+ else if(result == GET_SUBBUF_DIED) {
+ finish_consuming_dead_subbuffer(buf);
+ break;
}
/* write data to file */
}
/* put the subbuffer */
- if(died == 0) {
- result = put_subbuffer(buf);
- if(result == -1) {
- ERR("error putting subbuffer");
- break;
- }
- }
- else {
-// result = put_subbuffer_died(buf);
+ result = put_subbuffer(buf);
+ if(result == -1) {
+ ERR("error putting subbuffer");
+ break;
}
}