X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=ustd%2Fustd.c;h=a4c7e1f3f0c8a6dbdd17ac2fbf349a3fe7cb6f9d;hb=8cefc1450e2d929de5d2878518e7b8b2b77ff755;hp=d5884dfe040c8bed7667a03e06784e444023c0e1;hpb=ccc53fa17993db45742de9a70000ee7eefebe8f6;p=ust.git diff --git a/ustd/ustd.c b/ustd/ustd.c index d5884df..a4c7e1f 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -38,12 +38,20 @@ struct buffer_info { struct ustcomm_connection conn; int shmid; + int bufstruct_shmid; + + /* the buffer memory */ void *mem; + /* buffer size */ int memlen; - + /* number of subbuffers in buffer */ int n_subbufs; + /* size of each subbuffer */ int subbuf_size; + /* the buffer information struct */ + void *bufstruct_mem; + int file_fd; /* output file */ struct list_head list; @@ -56,6 +64,10 @@ struct buffer_info { * <0 = error */ +#define GET_SUBBUF_OK 1 +#define GET_SUBBUF_DONE 0 +#define GET_SUBBUF_DIED 2 + int get_subbuffer(struct buffer_info *buf) { char *send_msg; @@ -66,11 +78,15 @@ int get_subbuffer(struct buffer_info *buf) asprintf(&send_msg, "get_subbuffer %s", buf->name); result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); if(result < 0) { ERR("get_subbuffer: ustcomm_send_request failed"); return -1; } - free(send_msg); + else if(result == 0) { + DBG("app died while being traced"); + return GET_SUBBUF_DIED; + } result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); if(result != 2 && result != 1) { @@ -82,10 +98,10 @@ int get_subbuffer(struct buffer_info *buf) if(!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); - retval = 1; + retval = GET_SUBBUF_OK; } else if(nth_token_is(received_msg, "END", 0) == 1) { - return 0; + return GET_SUBBUF_DONE; } else { DBG("error getting subbuffer %s", buf->name); @@ -154,20 +170,80 @@ ssize_t patient_write(int fd, const void *buf, size_t count) return bufc-(const char *)buf; } +int get_subbuffer_died(struct buffer_info *buf) +{ + return 0; +} + +//int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old) +//{ +// struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem; +// +////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data; +// long consumed_old, consumed_idx, commit_count, write_offset; +// consumed_old = atomic_long_read(<t_buf->consumed); +// consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan); +// commit_count = local_read(<t_buf->commit_count[consumed_idx]); +// /* +// * Make sure we read the commit count before reading the buffer +// * data and the write offset. Correct consumed offset ordering +// * wrt commit count is insured by the use of cmpxchg to update +// * the consumed offset. +// */ +// smp_rmb(); +// write_offset = local_read(<t_buf->offset); +// /* +// * Check that the subbuffer we are trying to consume has been +// * already fully committed. +// */ +// if (((commit_count - buf->chan->subbuf_size) +// & ltt_channel->commit_count_mask) +// - (BUFFER_TRUNC(consumed_old, buf->chan) +// >> ltt_channel->n_subbufs_order) +// != 0) { +// return -EAGAIN; +// } +// /* +// * Check that we are not about to read the same subbuffer in +// * which the writer head is. +// */ +// if ((SUBBUF_TRUNC(write_offset, buf->chan) +// - SUBBUF_TRUNC(consumed_old, buf->chan)) +// == 0) { +// return -EAGAIN; +// } +// +// *pconsumed_old = consumed_old; +// return 0; +//} + void *consumer_thread(void *arg) { struct buffer_info *buf = (struct buffer_info *) arg; int result; + int died = 0; for(;;) { - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; + /* get the subbuffer */ + if(died == 0) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + else if(result == GET_SUBBUF_DONE) { + /* this is done */ + break; + } + else if(result == GET_SUBBUF_DIED) { + died = 1; + } } - if(result == 0) { - /* this is done */ - break; + if(died == 1) { + result = get_subbuffer_died(buf); + if(result <= 0) { + break; + } } /* write data to file */ @@ -177,15 +253,23 @@ void *consumer_thread(void *arg) /* FIXME: maybe drop this trace */ } - result = put_subbuffer(buf); - if(result == -1) { - ERR("error putting subbuffer"); - break; + /* put the subbuffer */ + if(died == 0) { + result = put_subbuffer(buf); + if(result == -1) { + ERR("error putting subbuffer"); + break; + } + } + else { +// result = put_subbuffer_died(buf); } } DBG("thread for buffer %s is stopping", buf->name); + /* FIXME: destroy, unalloc... */ + return NULL; } @@ -221,13 +305,13 @@ int add_buffer(pid_t pid, char *bufname) free(send_msg); DBG("got buffer name %s", buf->name); - result = sscanf(received_msg, "%d", &buf->shmid); - if(result != 1) { + result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); + if(result != 2) { ERR("unable to parse response to get_shmid"); return -1; } free(received_msg); - DBG("got shmid %d", buf->shmid); + DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); /* get n_subbufs */ asprintf(&send_msg, "get_n_subbufs %s", buf->name); @@ -261,7 +345,14 @@ int add_buffer(pid_t pid, char *bufname) perror("shmat"); return -1; } - DBG("successfully attached memory"); + DBG("successfully attached buffer memory"); + + buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); + if(buf->bufstruct_mem == (void *) 0) { + perror("shmat"); + return -1; + } + DBG("successfully attached buffer bufstruct memory"); /* open file for output */ asprintf(&tmp, "/tmp/trace/%s_0", buf->name);