From: Pierre-Marc Fournier Date: Fri, 20 Mar 2009 02:12:36 +0000 (-0400) Subject: start implementing sharing of buffer info X-Git-Tag: v0.1~256 X-Git-Url: http://git.lttng.org/?p=ust.git;a=commitdiff_plain;h=8cefc1450e2d929de5d2878518e7b8b2b77ff755 start implementing sharing of buffer info --- diff --git a/hello/run b/hello/run index 837b8f8..ecc0bf2 100755 --- a/hello/run +++ b/hello/run @@ -1,3 +1,3 @@ #!/bin/sh -LD_LIBRARY_PATH=../libtracectl:../libmarkers:../libtracing $1 ./hello +UST_AUTOPROBE=1 UST_TRACE=1 LD_LIBRARY_PATH=../libtracectl:../libmarkers:../libtracing $1 ./hello diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index 1edd194..2101d5c 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -312,12 +312,15 @@ int listener_main(void *p) for(i=0; inr_channels; i++) { struct rchan *rchan = trace->channels[i].trans_channel_data; struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)rchan->private_data; + struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf; if(!strcmp(trace->channels[i].channel_name, channel_name)) { char *reply; DBG("the shmid for the requested channel is %d", rbuf->shmid); - asprintf(&reply, "%d", rbuf->shmid); + DBG("the shmid for its buffer structure is %d", ltt_channel->buf_shmid); + asprintf(&reply, "%d %d", rbuf->shmid, ltt_channel->buf_shmid); result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); if(result) { diff --git a/libtracing/channels.h b/libtracing/channels.h index 1c5c34f..dedc1f8 100644 --- a/libtracing/channels.h +++ b/libtracing/channels.h @@ -53,6 +53,8 @@ struct ltt_channel_struct { unsigned int subbuf_size; unsigned int subbuf_cnt; const char *channel_name; + + int buf_shmid; } ____cacheline_aligned; struct ltt_channel_setting { diff --git a/libtracing/relay.c b/libtracing/relay.c index 88bdfef..2087bcd 100644 --- a/libtracing/relay.c +++ b/libtracing/relay.c @@ -92,8 +92,8 @@ static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf); static int relay_alloc_buf(struct rchan_buf *buf, size_t *size) { - unsigned int n_pages; - struct buf_page *buf_page, *n; +//ust// unsigned int n_pages; +//ust// struct buf_page *buf_page, *n; void *ptr; int result; @@ -1512,6 +1512,48 @@ static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan) //ust// wake_up_interruptible(&trace->kref_wq); } +static void ltt_chan_alloc_ltt_buf(struct ltt_channel_struct *ltt_chan) +{ + void *ptr; + int result; + + /* Get one page */ + size_t size = PAGE_ALIGN(1); + + result = ltt_chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700); + if(ltt_chan->buf_shmid == -1) { + PERROR("shmget"); + return -1; + } + + ptr = shmat(ltt_chan->buf_shmid, NULL, 0); + if(ptr == (void *) -1) { + perror("shmat"); + goto destroy_shmem; + } + + /* Already mark the shared memory for destruction. This will occur only + * when all users have detached. + */ + result = shmctl(ltt_chan->buf_shmid, IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); + return -1; + } + + ltt_chan->buf = ptr; + + return 0; + + destroy_shmem: + result = shmctl(ltt_chan->buf_shmid, IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); + } + + return -1; +} + /* * Create channel. */ @@ -1546,7 +1588,10 @@ static int ltt_relay_create_channel(const char *trace_name, ltt_chan->n_subbufs_order = get_count_order(n_subbufs); ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order); //ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map); - ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct)); + + ltt_chan_alloc_ltt_buf(ltt_chan); + +//ust// ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct)); if (!ltt_chan->buf) goto alloc_error; ltt_chan->trans_channel_data = ltt_relay_open(tmpname, diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c index 5468ad4..d6e5346 100644 --- a/libustcomm/ustcomm.c +++ b/libustcomm/ustcomm.c @@ -399,9 +399,6 @@ int ustcomm_send_request(struct ustcomm_connection *conn, char *req, char **repl PERROR("send"); return -1; } - else if(result == 0) { - return 0; - } if(!reply) return 1; diff --git a/ustd/ustd.c b/ustd/ustd.c index d5884df..a4c7e1f 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -38,12 +38,20 @@ struct buffer_info { struct ustcomm_connection conn; int shmid; + int bufstruct_shmid; + + /* the buffer memory */ void *mem; + /* buffer size */ int memlen; - + /* number of subbuffers in buffer */ int n_subbufs; + /* size of each subbuffer */ int subbuf_size; + /* the buffer information struct */ + void *bufstruct_mem; + int file_fd; /* output file */ struct list_head list; @@ -56,6 +64,10 @@ struct buffer_info { * <0 = error */ +#define GET_SUBBUF_OK 1 +#define GET_SUBBUF_DONE 0 +#define GET_SUBBUF_DIED 2 + int get_subbuffer(struct buffer_info *buf) { char *send_msg; @@ -66,11 +78,15 @@ int get_subbuffer(struct buffer_info *buf) asprintf(&send_msg, "get_subbuffer %s", buf->name); result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); if(result < 0) { ERR("get_subbuffer: ustcomm_send_request failed"); return -1; } - free(send_msg); + else if(result == 0) { + DBG("app died while being traced"); + return GET_SUBBUF_DIED; + } result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); if(result != 2 && result != 1) { @@ -82,10 +98,10 @@ int get_subbuffer(struct buffer_info *buf) if(!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); - retval = 1; + retval = GET_SUBBUF_OK; } else if(nth_token_is(received_msg, "END", 0) == 1) { - return 0; + return GET_SUBBUF_DONE; } else { DBG("error getting subbuffer %s", buf->name); @@ -154,20 +170,80 @@ ssize_t patient_write(int fd, const void *buf, size_t count) return bufc-(const char *)buf; } +int get_subbuffer_died(struct buffer_info *buf) +{ + return 0; +} + +//int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old) +//{ +// struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem; +// +////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data; +// long consumed_old, consumed_idx, commit_count, write_offset; +// consumed_old = atomic_long_read(<t_buf->consumed); +// consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan); +// commit_count = local_read(<t_buf->commit_count[consumed_idx]); +// /* +// * Make sure we read the commit count before reading the buffer +// * data and the write offset. Correct consumed offset ordering +// * wrt commit count is insured by the use of cmpxchg to update +// * the consumed offset. +// */ +// smp_rmb(); +// write_offset = local_read(<t_buf->offset); +// /* +// * Check that the subbuffer we are trying to consume has been +// * already fully committed. +// */ +// if (((commit_count - buf->chan->subbuf_size) +// & ltt_channel->commit_count_mask) +// - (BUFFER_TRUNC(consumed_old, buf->chan) +// >> ltt_channel->n_subbufs_order) +// != 0) { +// return -EAGAIN; +// } +// /* +// * Check that we are not about to read the same subbuffer in +// * which the writer head is. +// */ +// if ((SUBBUF_TRUNC(write_offset, buf->chan) +// - SUBBUF_TRUNC(consumed_old, buf->chan)) +// == 0) { +// return -EAGAIN; +// } +// +// *pconsumed_old = consumed_old; +// return 0; +//} + void *consumer_thread(void *arg) { struct buffer_info *buf = (struct buffer_info *) arg; int result; + int died = 0; for(;;) { - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; + /* get the subbuffer */ + if(died == 0) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + else if(result == GET_SUBBUF_DONE) { + /* this is done */ + break; + } + else if(result == GET_SUBBUF_DIED) { + died = 1; + } } - if(result == 0) { - /* this is done */ - break; + if(died == 1) { + result = get_subbuffer_died(buf); + if(result <= 0) { + break; + } } /* write data to file */ @@ -177,15 +253,23 @@ void *consumer_thread(void *arg) /* FIXME: maybe drop this trace */ } - result = put_subbuffer(buf); - if(result == -1) { - ERR("error putting subbuffer"); - break; + /* put the subbuffer */ + if(died == 0) { + result = put_subbuffer(buf); + if(result == -1) { + ERR("error putting subbuffer"); + break; + } + } + else { +// result = put_subbuffer_died(buf); } } DBG("thread for buffer %s is stopping", buf->name); + /* FIXME: destroy, unalloc... */ + return NULL; } @@ -221,13 +305,13 @@ int add_buffer(pid_t pid, char *bufname) free(send_msg); DBG("got buffer name %s", buf->name); - result = sscanf(received_msg, "%d", &buf->shmid); - if(result != 1) { + result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); + if(result != 2) { ERR("unable to parse response to get_shmid"); return -1; } free(received_msg); - DBG("got shmid %d", buf->shmid); + DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); /* get n_subbufs */ asprintf(&send_msg, "get_n_subbufs %s", buf->name); @@ -261,7 +345,14 @@ int add_buffer(pid_t pid, char *bufname) perror("shmat"); return -1; } - DBG("successfully attached memory"); + DBG("successfully attached buffer memory"); + + buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); + if(buf->bufstruct_mem == (void *) 0) { + perror("shmat"); + return -1; + } + DBG("successfully attached buffer bufstruct memory"); /* open file for output */ asprintf(&tmp, "/tmp/trace/%s_0", buf->name);