From f99c0b5cbc04e120ca0b80b472844d9b76941308 Mon Sep 17 00:00:00 2001 From: Pierre-Marc Fournier Date: Mon, 5 Oct 2009 15:55:19 -0400 Subject: [PATCH] minor refactor of ustd to avoid deadlock Previously, we could have a deadlock between ustd and libust: - libust connects to the ustd socket and waits for a reply - while ustd is sending commands to the same libust to get information about a buffer To fix this, info about a buffer to collect is retrieved from within a different thread than the one that processes incoming commands. --- ustd/ustd.c | 217 ++++++++++++++++++++++++++++++++-------------------- ustd/ustd.h | 2 +- 2 files changed, 136 insertions(+), 83 deletions(-) diff --git a/ustd/ustd.c b/ustd/ustd.c index 24087d3..2393ea3 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -172,66 +172,6 @@ void decrement_active_buffers(void *arg) pthread_mutex_unlock(&active_buffers_mutex); } -void *consumer_thread(void *arg) -{ - struct buffer_info *buf = (struct buffer_info *) arg; - int result; - - pthread_cleanup_push(decrement_active_buffers, NULL); - - for(;;) { - /* get the subbuffer */ - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - else if(result == GET_SUBBUF_DONE) { - /* this is done */ - break; - } - else if(result == GET_SUBBUF_DIED) { - finish_consuming_dead_subbuffer(buf); - break; - } - - /* write data to file */ - result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); - if(result == -1) { - PERROR("write"); - /* FIXME: maybe drop this trace */ - } - - /* put the subbuffer */ - /* FIXME: we actually should unput the buffer before consuming... */ - result = put_subbuffer(buf); - if(result == -1) { - ERR("unknown error putting subbuffer (channel=%s)", buf->name); - break; - } - else if(result == PUT_SUBBUF_PUSHED) { - ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name); - break; - } - else if(result == PUT_SUBBUF_DIED) { - WARN("application died while putting subbuffer"); - /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */ - finish_consuming_dead_subbuffer(buf); - break; - } - else if(result == PUT_SUBBUF_OK) { - } - } - - DBG("thread for buffer %s is stopping", buf->name); - - /* FIXME: destroy, unalloc... */ - - pthread_cleanup_pop(1); - - return NULL; -} - int create_dir_if_needed(char *dir) { int result; @@ -264,7 +204,7 @@ int is_directory(const char *dir) return 1; } -int add_buffer(pid_t pid, char *bufname) +struct buffer_info *connect_buffer(pid_t pid, const char *bufname) { struct buffer_info *buf; char *send_msg; @@ -272,13 +212,12 @@ int add_buffer(pid_t pid, char *bufname) int result; char *tmp; int fd; - pthread_t thr; struct shmid_ds shmds; buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); if(buf == NULL) { ERR("add_buffer: insufficient memory"); - return -1; + return NULL; } buf->name = bufname; @@ -288,7 +227,7 @@ int add_buffer(pid_t pid, char *bufname) result = ustcomm_connect_app(buf->pid, &buf->conn); if(result) { WARN("unable to connect to process, it probably died before we were able to connect"); - return -1; + return NULL; } /* get pidunique */ @@ -297,13 +236,13 @@ int add_buffer(pid_t pid, char *bufname) free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_pidunique)"); - return -1; + return NULL; } result = sscanf(received_msg, "%lld", &buf->pidunique); if(result != 1) { ERR("unable to parse response to get_pidunique"); - return -1; + return NULL; } free(received_msg); DBG("got pidunique %lld", buf->pidunique); @@ -314,13 +253,13 @@ int add_buffer(pid_t pid, char *bufname) free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_shmid)"); - return -1; + return NULL; } result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); if(result != 2) { ERR("unable to parse response to get_shmid"); - return -1; + return NULL; } free(received_msg); DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); @@ -331,13 +270,13 @@ int add_buffer(pid_t pid, char *bufname) free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(g_n_subbufs)"); - return -1; + return NULL; } result = sscanf(received_msg, "%d", &buf->n_subbufs); if(result != 1) { ERR("unable to parse response to get_n_subbufs"); - return -1; + return NULL; } free(received_msg); DBG("got n_subbufs %d", buf->n_subbufs); @@ -350,7 +289,7 @@ int add_buffer(pid_t pid, char *bufname) result = sscanf(received_msg, "%d", &buf->subbuf_size); if(result != 1) { ERR("unable to parse response to get_subbuf_size"); - return -1; + return NULL; } free(received_msg); DBG("got subbuf_size %d", buf->subbuf_size); @@ -359,14 +298,14 @@ int add_buffer(pid_t pid, char *bufname) buf->mem = shmat(buf->shmid, NULL, 0); if(buf->mem == (void *) 0) { PERROR("shmat"); - return -1; + return NULL; } DBG("successfully attached buffer memory"); buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); if(buf->bufstruct_mem == (void *) 0) { PERROR("shmat"); - return -1; + return NULL; } DBG("successfully attached buffer bufstruct memory"); @@ -374,7 +313,7 @@ int add_buffer(pid_t pid, char *bufname) result = shmctl(buf->shmid, IPC_STAT, &shmds); if(result == -1) { PERROR("shmctl"); - return -1; + return NULL; } buf->memlen = shmds.shm_segsz; @@ -387,7 +326,7 @@ int add_buffer(pid_t pid, char *bufname) result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH); if(result == -1) { ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH); - return -1; + return NULL; } trace_path = USTD_DEFAULT_TRACE_PATH; @@ -398,7 +337,7 @@ int add_buffer(pid_t pid, char *bufname) if(result == -1) { ERR("could not create directory %s", tmp); free(tmp); - return -1; + return NULL; } free(tmp); @@ -407,7 +346,7 @@ int add_buffer(pid_t pid, char *bufname) if(result == -1) { PERROR("open"); ERR("failed opening trace file %s", tmp); - return -1; + return NULL; } buf->file_fd = fd; free(tmp); @@ -416,7 +355,115 @@ int add_buffer(pid_t pid, char *bufname) active_buffers++; pthread_mutex_unlock(&active_buffers_mutex); - pthread_create(&thr, NULL, consumer_thread, buf); + return buf; +} + +int consumer_loop(struct buffer_info *buf) +{ + int result; + + pthread_cleanup_push(decrement_active_buffers, NULL); + + for(;;) { + /* get the subbuffer */ + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + else if(result == GET_SUBBUF_DONE) { + /* this is done */ + break; + } + else if(result == GET_SUBBUF_DIED) { + finish_consuming_dead_subbuffer(buf); + break; + } + + /* write data to file */ + result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); + if(result == -1) { + PERROR("write"); + /* FIXME: maybe drop this trace */ + } + + /* put the subbuffer */ + /* FIXME: we actually should unput the buffer before consuming... */ + result = put_subbuffer(buf); + if(result == -1) { + ERR("unknown error putting subbuffer (channel=%s)", buf->name); + break; + } + else if(result == PUT_SUBBUF_PUSHED) { + ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name); + break; + } + else if(result == PUT_SUBBUF_DIED) { + WARN("application died while putting subbuffer"); + /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */ + finish_consuming_dead_subbuffer(buf); + break; + } + else if(result == PUT_SUBBUF_OK) { + } + } + + DBG("thread for buffer %s is stopping", buf->name); + + /* FIXME: destroy, unalloc... */ + + pthread_cleanup_pop(1); + + return 0; +} + +void free_buffer(struct buffer_info *buf) +{ +} + +struct consumer_thread_args { + pid_t pid; + const char *bufname; +}; + +void *consumer_thread(void *arg) +{ + struct buffer_info *buf = (struct buffer_info *) arg; + struct consumer_thread_args *args = (struct consumer_thread_args *) arg; + + DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname); + + buf = connect_buffer(args->pid, args->bufname); + if(buf == NULL) { + ERR("failed to connect to buffer"); + goto end; + } + + consumer_loop(buf); + + free_buffer(buf); + + end: + /* bufname is free'd in free_buffer() */ + free(args); + return NULL; +} + +int start_consuming_buffer(pid_t pid, const char *bufname) +{ + pthread_t thr; + struct consumer_thread_args *args; + + DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); + + args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args)); + + args->pid = pid; + args->bufname = strdup(bufname); + DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); + + pthread_create(&thr, NULL, consumer_thread, args); + DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); return 0; } @@ -541,7 +588,7 @@ int main(int argc, char **argv) result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100); if(result == -1) { ERR("error in ustcomm_ustd_recv_message"); - continue; + goto loop_end; } if(result > 0) { if(!strncmp(recvbuf, "collect", 7)) { @@ -551,19 +598,25 @@ int main(int argc, char **argv) result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); if(result != 2) { - fprintf(stderr, "parsing error: %s\n", recvbuf); + ERR("parsing error: %s", recvbuf); + goto free_bufname; } - result = add_buffer(pid, bufname); + result = start_consuming_buffer(pid, bufname); if(result < 0) { ERR("error in add_buffer"); - continue; + goto free_bufname; } + + free_bufname: + free(bufname); } free(recvbuf); } + loop_end: + if(terminate_req) { pthread_mutex_lock(&active_buffers_mutex); if(active_buffers == 0) { diff --git a/ustd/ustd.h b/ustd/ustd.h index 6025213..8d5becd 100644 --- a/ustd/ustd.h +++ b/ustd/ustd.h @@ -6,7 +6,7 @@ #define USTD_DEFAULT_TRACE_PATH "/tmp/usttrace" struct buffer_info { - char *name; + const char *name; pid_t pid; struct ustcomm_connection conn; -- 2.34.1