X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libustd%2Flibustd.c;h=6e7b0cd5c145024e8a73a210508bdf5e9c63a0e8;hb=72098143aa5d995802b411e152b89ad252dd37ca;hp=47cb59499c32de3372c23ef2bc80bc4d2c775645;hpb=f3f8cc914bc8fdf89e328ed591bd052a981c8298;p=ust.git diff --git a/libustd/libustd.c b/libustd/libustd.c index 47cb594..6e7b0cd 100644 --- a/libustd/libustd.c +++ b/libustd/libustd.c @@ -18,7 +18,10 @@ #define _GNU_SOURCE +#include #include +#include +#include #include #include #include @@ -29,15 +32,11 @@ #include #include -#include "libustd.h" +#include +#include "lowlevel.h" #include "usterr.h" #include "ustcomm.h" -/* return value: 0 = subbuffer is finished, it won't produce data anymore - * 1 = got subbuffer successfully - * <0 = error - */ - #define GET_SUBBUF_OK 1 #define GET_SUBBUF_DONE 0 #define GET_SUBBUF_DIED 2 @@ -49,129 +48,94 @@ #define UNIX_PATH_MAX 108 -int get_subbuffer(struct buffer_info *buf) +static int get_subbuffer(struct buffer_info *buf) { - char *send_msg=NULL; - char *received_msg=NULL; - char *rep_code=NULL; - int retval; + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_buffer_info _send_msg, _recv_msg; + struct ustcomm_buffer_info *send_msg, *recv_msg; int result; - asprintf(&send_msg, "get_subbuffer %s", buf->name); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) { - DBG("app died while being traced"); - retval = GET_SUBBUF_DIED; - goto end; - } - else if(result < 0) { - ERR("get_subbuffer: ustcomm_send_request failed"); - retval = -1; - goto end; + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + send_msg = &_send_msg; + recv_msg = &_recv_msg; + + result = ustcomm_pack_buffer_info(send_hdr, send_msg, + buf->channel, buf->channel_cpu); + if (result < 0) { + return result; } - result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); - if(result != 2 && result != 1) { - ERR("unable to parse response to get_subbuffer"); - retval = -1; - free(received_msg); - goto end_rep; + send_hdr->command = GET_SUBBUFFER; + + result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg, + recv_hdr, (char *)recv_msg); + if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) || + result == 0) { + DBG("app died while being traced"); + return GET_SUBBUF_DIED; + } else if (result < 0) { + ERR("get_subbuffer: ustcomm_req failed"); + return result; } - if(!strcmp(rep_code, "OK")) { + if (!recv_hdr->result) { DBG("got subbuffer %s", buf->name); - retval = GET_SUBBUF_OK; - } - else if(nth_token_is(received_msg, "END", 0) == 1) { - retval = GET_SUBBUF_DONE; - goto end_rep; - } - else if(!strcmp(received_msg, "NOTFOUND")) { - DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); - retval = GET_SUBBUF_DIED; - goto end_rep; + buf->consumed_old = recv_msg->consumed_old; + return GET_SUBBUF_OK; + } else if (recv_hdr->result == -ENODATA) { + DBG("For buffer %s, the trace was not found. This likely means" + " it was destroyed by the user.", buf->name); + return GET_SUBBUF_DIED; } - else { - DBG("error getting subbuffer %s", buf->name); - retval = -1; - } - - /* FIXME: free correctly the stuff */ -end_rep: - if(rep_code) - free(rep_code); -end: - if(send_msg) - free(send_msg); - if(received_msg) - free(received_msg); - return retval; + DBG("error getting subbuffer %s", buf->name); + return recv_hdr->result; } -int put_subbuffer(struct buffer_info *buf) +static int put_subbuffer(struct buffer_info *buf) { - char *send_msg=NULL; - char *received_msg=NULL; - char *rep_code=NULL; - int retval; + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_buffer_info _send_msg, *send_msg; int result; - asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) { - retval = PUT_SUBBUF_DIED; - goto end; - } - else if(result < 0) { - ERR("put_subbuffer: send_message failed"); - retval = -1; - goto end; - } - else if(result == 0) { - /* Program seems finished. However this might not be - * the last subbuffer that has to be collected. - */ - retval = PUT_SUBBUF_DIED; - goto end; - } + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + send_msg = &_send_msg; - result = sscanf(received_msg, "%as", &rep_code); - if(result != 1) { - ERR("unable to parse response to put_subbuffer"); - retval = -1; - goto end_rep; + result = ustcomm_pack_buffer_info(send_hdr, send_msg, + buf->channel, buf->channel_cpu); + if (result < 0) { + return result; } - if(!strcmp(rep_code, "OK")) { - DBG("subbuffer put %s", buf->name); - retval = PUT_SUBBUF_OK; - } - else if(!strcmp(received_msg, "NOTFOUND")) { - DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); - /* However, maybe this was not the last subbuffer. So - * we return the program died. - */ - retval = PUT_SUBBUF_DIED; - goto end_rep; - } - else { - DBG("put_subbuffer: received error, we were pushed"); - retval = PUT_SUBBUF_PUSHED; - goto end_rep; - } + send_hdr->command = PUT_SUBBUFFER; + send_msg->consumed_old = buf->consumed_old; -end_rep: - if(rep_code) - free(rep_code); + result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg, + recv_hdr, NULL); + if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) || + result == 0) { + DBG("app died while being traced"); + return PUT_SUBBUF_DIED; + } else if (result < 0) { + ERR("put_subbuffer: ustcomm_req failed"); + return result; + } -end: - if(send_msg) - free(send_msg); - if(received_msg) - free(received_msg); + if (!recv_hdr->result) { + DBG("put subbuffer %s", buf->name); + return PUT_SUBBUF_OK; + } else if (recv_hdr->result == -ENODATA) { + DBG("For buffer %s, the trace was not found. This likely means" + " it was destroyed by the user.", buf->name); + return PUT_SUBBUF_DIED; + } - return retval; + DBG("error getting subbuffer %s", buf->name); + return recv_hdr->result; } void decrement_active_buffers(void *arg) @@ -182,129 +146,219 @@ void decrement_active_buffers(void *arg) pthread_mutex_unlock(&instance->mutex); } -struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname) +static int get_pidunique(int sock, s64 *pidunique) { - struct buffer_info *buf; - char *send_msg; - char *received_msg; + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_pidunique _recv_msg, *recv_msg; int result; - struct shmid_ds shmds; - buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); - if(buf == NULL) { - ERR("add_buffer: insufficient memory"); - return NULL; - } + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + recv_msg = &_recv_msg; - buf->conn = malloc(sizeof(struct ustcomm_connection)); - if(buf->conn == NULL) { - ERR("add_buffer: insufficient memory"); - free(buf); - return NULL; + memset(send_hdr, 0, sizeof(*send_hdr)); + + send_hdr->command = GET_PIDUNIQUE; + result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg); + if (result < 1) { + return -ENOTCONN; + } + if (recv_hdr->result < 0) { + ERR("App responded with error: %s", strerror(recv_hdr->result)); + return recv_hdr->result; } - buf->name = bufname; - buf->pid = pid; + *pidunique = recv_msg->pidunique; - /* connect to app */ - result = ustcomm_connect_app(buf->pid, buf->conn); - if(result) { - WARN("unable to connect to process, it probably died before we were able to connect"); - return NULL; + return 0; +} + +static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf, + int *buf_shmid, int *buf_struct_shmid, + int *buf_pipe_fd) +{ + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_buffer_info _send_msg, *send_msg; + struct ustcomm_buffer_info _recv_msg, *recv_msg; + int result, recv_pipe_fd; + + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + send_msg = &_send_msg; + recv_msg = &_recv_msg; + + result = ustcomm_pack_buffer_info(send_hdr, send_msg, + buf->channel, buf->channel_cpu); + if (result < 0) { + ERR("Failed to pack buffer info"); + return result; } - /* get pidunique */ - asprintf(&send_msg, "get_pidunique"); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(get_pidunique)"); - return NULL; + send_hdr->command = GET_BUF_SHMID_PIPE_FD; + + result = ustcomm_send(sock, send_hdr, (char *)send_msg); + if (result < 1) { + ERR("Failed to send request"); + return -ENOTCONN; + } + result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd); + if (result < 1) { + ERR("Failed to receive message and fd"); + return -ENOTCONN; } - if(result == 0) { - goto error; + if (recv_hdr->result < 0) { + ERR("App responded with error %s", strerror(recv_hdr->result)); + return recv_hdr->result; } - result = sscanf(received_msg, "%lld", &buf->pidunique); - if(result != 1) { - ERR("unable to parse response to get_pidunique"); - return NULL; + *buf_shmid = recv_msg->buf_shmid; + *buf_struct_shmid = recv_msg->buf_struct_shmid; + *buf_pipe_fd = recv_pipe_fd; + + return 0; +} + +static int get_subbuf_num_size(int sock, struct buffer_info *buf, + int *subbuf_num, int *subbuf_size) +{ + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_channel_info _send_msg, *send_msg; + struct ustcomm_channel_info _recv_msg, *recv_msg; + int result; + + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + send_msg = &_send_msg; + recv_msg = &_recv_msg; + + result = ustcomm_pack_channel_info(send_hdr, send_msg, + buf->channel); + if (result < 0) { + return result; } - free(received_msg); - DBG("got pidunique %lld", buf->pidunique); - /* get shmid */ - asprintf(&send_msg, "get_shmid %s", buf->name); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(get_shmid)"); - return NULL; + send_hdr->command = GET_SUBBUF_NUM_SIZE; + + result = ustcomm_req(sock, send_hdr, (char *)send_msg, + recv_hdr, (char *)recv_msg); + if (result < 1) { + return -ENOTCONN; } - if(result == 0) { - goto error; + + *subbuf_num = recv_msg->subbuf_num; + *subbuf_size = recv_msg->subbuf_size; + + return recv_hdr->result; +} + + +static int notify_buffer_mapped(int sock, struct buffer_info *buf) +{ + struct ustcomm_header _send_hdr, *send_hdr; + struct ustcomm_header _recv_hdr, *recv_hdr; + struct ustcomm_buffer_info _send_msg, *send_msg; + int result; + + send_hdr = &_send_hdr; + recv_hdr = &_recv_hdr; + send_msg = &_send_msg; + + result = ustcomm_pack_buffer_info(send_hdr, send_msg, + buf->channel, buf->channel_cpu); + if (result < 0) { + return result; } - result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); - if(result != 2) { - ERR("unable to parse response to get_shmid (\"%s\")", received_msg); - return NULL; + send_hdr->command = NOTIFY_BUF_MAPPED; + + result = ustcomm_req(sock, send_hdr, (char *)send_msg, + recv_hdr, NULL); + if (result < 1) { + return -ENOTCONN; } - free(received_msg); - DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); - /* get n_subbufs */ - asprintf(&send_msg, "get_n_subbufs %s", buf->name); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(g_n_subbufs)"); + return recv_hdr->result; +} + + +struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, + const char *channel, int channel_cpu) +{ + struct buffer_info *buf; + int result; + struct shmid_ds shmds; + + buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info)); + if(buf == NULL) { + ERR("add_buffer: insufficient memory"); return NULL; } - if(result == 0) { - goto error; + + buf->channel = strdup(channel); + if (!buf->channel) { + goto free_buf; } - result = sscanf(received_msg, "%d", &buf->n_subbufs); - if(result != 1) { - ERR("unable to parse response to get_n_subbufs"); - return NULL; + result = asprintf(&buf->name, "%s_%d", channel, channel_cpu); + if (result < 0 || buf->name == NULL) { + goto free_buf_channel; } - free(received_msg); - DBG("got n_subbufs %d", buf->n_subbufs); - /* get subbuf size */ - asprintf(&send_msg, "get_subbuf_size %s", buf->name); - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(get_subbuf_size)"); - return NULL; + buf->channel_cpu = channel_cpu; + buf->pid = pid; + + result = ustcomm_connect_app(buf->pid, &buf->app_sock); + if(result) { + WARN("unable to connect to process, it probably died before we were able to connect"); + goto free_buf_name; } - if(result == 0) { - goto error; + + /* get pidunique */ + result = get_pidunique(buf->app_sock, &buf->pidunique); + if (result < 0) { + ERR("Failed to get pidunique"); + goto close_app_sock; + } + + /* get shmid and pipe fd */ + result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid, + &buf->bufstruct_shmid, &buf->pipe_fd); + if (result < 0) { + ERR("Failed to get buf_shmid and pipe_fd"); + goto close_app_sock; + } else { + struct stat temp; + fstat(buf->pipe_fd, &temp); + if (!S_ISFIFO(temp.st_mode)) { + ERR("Didn't receive a fifo from the app"); + goto close_app_sock; + } } - result = sscanf(received_msg, "%d", &buf->subbuf_size); - if(result != 1) { - ERR("unable to parse response to get_subbuf_size"); - return NULL; + + /* get number of subbufs and subbuf size */ + result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs, + &buf->subbuf_size); + if (result < 0) { + ERR("Failed to get subbuf number and size"); + goto close_fifo; } - free(received_msg); - DBG("got subbuf_size %d", buf->subbuf_size); /* attach memory */ buf->mem = shmat(buf->shmid, NULL, 0); if(buf->mem == (void *) 0) { PERROR("shmat"); - return NULL; + goto close_fifo; } DBG("successfully attached buffer memory"); buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); if(buf->bufstruct_mem == (void *) 0) { PERROR("shmat"); - return NULL; + goto shmdt_mem; } DBG("successfully attached buffer bufstruct memory"); @@ -312,10 +366,16 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, result = shmctl(buf->shmid, IPC_STAT, &shmds); if(result == -1) { PERROR("shmctl"); - return NULL; + goto shmdt_bufstruct_mem; } buf->memlen = shmds.shm_segsz; + /* Notify the application that we have mapped the buffer */ + result = notify_buffer_mapped(buf->app_sock, buf); + if (result < 0) { + goto shmdt_bufstruct_mem; + } + if(instance->callbacks->on_open_buffer) instance->callbacks->on_open_buffer(instance->callbacks, buf); @@ -325,7 +385,25 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, return buf; -error: +shmdt_bufstruct_mem: + shmdt(buf->bufstruct_mem); + +shmdt_mem: + shmdt(buf->mem); + +close_fifo: + close(buf->pipe_fd); + +close_app_sock: + close(buf->app_sock); + +free_buf_name: + free(buf->name); + +free_buf_channel: + free(buf->channel); + +free_buf: free(buf); return NULL; } @@ -335,7 +413,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, { int result; - result = ustcomm_close_app(buf->conn); + result = close(buf->app_sock); if(result == -1) { WARN("problem calling ustcomm_close_app"); } @@ -353,28 +431,31 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, if(callbacks->on_close_buffer) callbacks->on_close_buffer(callbacks, buf); - free(buf->conn); free(buf); } int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) { - int result; + int result, read_result; + char read_buf; pthread_cleanup_push(decrement_active_buffers, instance); for(;;) { + read_result = read(buf->pipe_fd, &read_buf, 1); /* get the subbuffer */ - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - else if(result == GET_SUBBUF_DONE) { - /* this is done */ - break; - } - else if(result == GET_SUBBUF_DIED) { + if (read_result == 1) { + result = get_subbuffer(buf); + if (result < 0) { + ERR("error getting subbuffer"); + continue; + } else if (result == GET_SUBBUF_DIED) { + finish_consuming_dead_subbuffer(instance->callbacks, buf); + break; + } + } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) || + result == 0) { + DBG("App died while being traced"); finish_consuming_dead_subbuffer(instance->callbacks, buf); break; } @@ -427,7 +508,8 @@ int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) struct consumer_thread_args { pid_t pid; - const char *bufname; + const char *channel; + int channel_cpu; struct libustd_instance *instance; }; @@ -438,8 +520,6 @@ void *consumer_thread(void *arg) int result; sigset_t sigset; - DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname); - if(args->instance->callbacks->on_new_thread) args->instance->callbacks->on_new_thread(args->instance->callbacks); @@ -465,7 +545,8 @@ void *consumer_thread(void *arg) goto end; } - buf = connect_buffer(args->instance, args->pid, args->bufname); + buf = connect_buffer(args->instance, args->pid, + args->channel, args->channel_cpu); if(buf == NULL) { ERR("failed to connect to buffer"); goto end; @@ -480,26 +561,32 @@ void *consumer_thread(void *arg) if(args->instance->callbacks->on_close_thread) args->instance->callbacks->on_close_thread(args->instance->callbacks); - free((void *)args->bufname); + free((void *)args->channel); free(args); return NULL; } -int start_consuming_buffer( - struct libustd_instance *instance, pid_t pid, const char *bufname) +int start_consuming_buffer(struct libustd_instance *instance, pid_t pid, + const char *channel, int channel_cpu) { pthread_t thr; struct consumer_thread_args *args; int result; - DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); + DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel, + channel_cpu); - args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args)); + args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args)); + if (!args) { + return -ENOMEM; + } args->pid = pid; - args->bufname = strdup(bufname); + args->channel = strdup(channel); + args->channel_cpu = channel_cpu; args->instance = instance; - DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); + DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s_%d", + args->pid, args->channel, args->channel_cpu); result = pthread_create(&thr, NULL, consumer_thread, args); if(result == -1) { @@ -511,68 +598,115 @@ int start_consuming_buffer( ERR("pthread_detach failed"); return -1; } - DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); + DBG("end of start_consuming_buffer: args: pid %d bufname %s_%d", + args->pid, args->channel, args->channel_cpu); return 0; } +static void process_client_cmd(int sock, struct ustcomm_header *req_header, + char *recvbuf, struct libustd_instance *instance) +{ + int result; + struct ustcomm_header _res_header; + struct ustcomm_header *res_header = &_res_header; + struct ustcomm_buffer_info *buf_inf; + + DBG("Processing client command"); + + switch (req_header->command) { + case CONSUME_BUFFER: + + buf_inf = (struct ustcomm_buffer_info *)recvbuf; + result = ustcomm_unpack_buffer_info(buf_inf); + if (result < 0) { + ERR("Couldn't unpack buffer info"); + return; + } + + DBG("Going to consume buffer %s_%d in process %d", + buf_inf->channel, buf_inf->ch_cpu, buf_inf->pid); + result = start_consuming_buffer(instance, buf_inf->pid, + buf_inf->channel, + buf_inf->ch_cpu); + if (result < 0) { + ERR("error in add_buffer"); + return; + } + + res_header->result = 0; + break; + case EXIT: + res_header->result = 0; + /* Only there to force poll to return */ + break; + default: + res_header->result = -EINVAL; + WARN("unknown command: %d", req_header->command); + } + + if (ustcomm_send(sock, res_header, NULL) <= 0) { + ERR("couldn't send command response"); + } +} + +#define MAX_EVENTS 10 int libustd_start_instance(struct libustd_instance *instance) { - int result; - int timeout = -1; + struct ustcomm_header recv_hdr; + char recv_buf[USTCOMM_BUFFER_SIZE]; + struct ustcomm_sock *epoll_sock; + struct epoll_event events[MAX_EVENTS]; + struct sockaddr addr; + int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout; if(!instance->is_init) { ERR("libustd instance not initialized"); return 1; } + epoll_fd = instance->epoll_fd; + + timeout = -1; /* app loop */ for(;;) { - char *recvbuf; - - /* check for requests on our public socket */ - result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout); - if(result == -1 && errno == EINTR) { + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout); + if (nfds == -1 && errno == EINTR) { /* Caught signal */ + } else if (nfds == -1) { + PERROR("libustd_start_instance: epoll_wait failed"); + continue; } - else if(result == -1) { - ERR("error in ustcomm_ustd_recv_message"); - goto loop_end; - } - else if(result > 0) { - if(!strncmp(recvbuf, "collect", 7)) { - pid_t pid; - char *bufname; - int result; - - result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); - if(result != 2) { - ERR("parsing error: %s", recvbuf); - goto free_bufname; - } - result = start_consuming_buffer(instance, pid, bufname); - if(result < 0) { - ERR("error in add_buffer"); - goto free_bufname; + for (i = 0; i < nfds; ++i) { + epoll_sock = (struct ustcomm_sock *)events[i].data.ptr; + if (epoll_sock == instance->listen_sock) { + addr_size = sizeof(struct sockaddr); + accept_fd = accept(epoll_sock->fd, + &addr, + (socklen_t *)&addr_size); + if (accept_fd == -1) { + PERROR("libustd_start_instance: " + "accept failed"); + continue; + } + ustcomm_init_sock(accept_fd, epoll_fd, + &instance->connections); + } else { + result = ustcomm_recv(epoll_sock->fd, &recv_hdr, + recv_buf); + if (result < 1) { + ustcomm_del_sock(epoll_sock, 0); + } else { + process_client_cmd(epoll_sock->fd, + &recv_hdr, recv_buf, + instance); } - free_bufname: - free(bufname); - } - else if(!strncmp(recvbuf, "exit", 4)) { - /* Only there to force poll to return */ } - else { - WARN("unknown command: %s", recvbuf); - } - - free(recvbuf); } - loop_end: - - if(instance->quit_program) { + if (instance->quit_program) { pthread_mutex_lock(&instance->mutex); if(instance->active_buffers == 0) { pthread_mutex_unlock(&instance->mutex); @@ -591,17 +725,22 @@ int libustd_start_instance(struct libustd_instance *instance) return 0; } +/* FIXME: threads and connections !? */ void libustd_delete_instance(struct libustd_instance *instance) { - if(instance->is_init) - ustcomm_fini_ustd(instance->comm); + if (instance->is_init) { + ustcomm_del_named_sock(instance->listen_sock, 0); + close(instance->epoll_fd); + } pthread_mutex_destroy(&instance->mutex); free(instance->sock_path); - free(instance->comm); free(instance); } +/* FIXME: Do something about the fixed path length, maybe get rid + * of the whole concept and use a pipe? + */ int libustd_stop_instance(struct libustd_instance *instance, int send_msg) { int result; @@ -643,17 +782,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg) return 0; } -struct libustd_instance *libustd_new_instance( - struct libustd_callbacks *callbacks, char *sock_path) +struct libustd_instance +*libustd_new_instance(struct libustd_callbacks *callbacks, + char *sock_path) { struct libustd_instance *instance = - malloc(sizeof(struct libustd_instance)); - if(!instance) - return NULL; - - instance->comm = malloc(sizeof(struct ustcomm_ustd)); - if(!instance->comm) { - free(instance); + zmalloc(sizeof(struct libustd_instance)); + if(!instance) { return NULL; } @@ -663,18 +798,75 @@ struct libustd_instance *libustd_new_instance( instance->active_buffers = 0; pthread_mutex_init(&instance->mutex, NULL); - if(sock_path) + if (sock_path) { instance->sock_path = strdup(sock_path); - else + } else { instance->sock_path = NULL; + } return instance; } +static int init_ustd_socket(struct libustd_instance *instance) +{ + char *name; + + if (instance->sock_path) { + if (asprintf(&name, "%s", instance->sock_path) < 0) { + ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)", + instance->sock_path); + return -1; + } + } else { + int result; + + /* Only check if socket dir exists if we are using the default directory */ + result = ensure_dir_exists(SOCK_DIR); + if (result == -1) { + ERR("Unable to create socket directory %s", SOCK_DIR); + return -1; + } + + if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) { + ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)", + SOCK_DIR); + return -1; + } + } + + /* Set up epoll */ + instance->epoll_fd = epoll_create(MAX_EVENTS); + if (instance->epoll_fd == -1) { + ERR("epoll_create failed, start instance bailing"); + goto free_name; + } + + /* Create the named socket */ + instance->listen_sock = ustcomm_init_named_socket(name, + instance->epoll_fd); + if(!instance->listen_sock) { + ERR("error initializing named socket at %s", name); + goto close_epoll; + } + + INIT_LIST_HEAD(&instance->connections); + + free(name); + + return 0; + +close_epoll: + close(instance->epoll_fd); +free_name: + free(name); + + return -1; +} + int libustd_init_instance(struct libustd_instance *instance) { int result; - result = ustcomm_init_ustd(instance->comm, instance->sock_path); + result = init_ustd_socket(instance); if(result == -1) { ERR("failed to initialize socket"); return 1;