Introduce a new communication protocol for UST v5
[ust.git] / libustd / libustd.c
index 47cb59499c32de3372c23ef2bc80bc4d2c775645..6e7b0cd5c145024e8a73a210508bdf5e9c63a0e8 100644 (file)
 
 #define _GNU_SOURCE
 
+#include <sys/epoll.h>
 #include <sys/shm.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 #include <unistd.h>
 #include <pthread.h>
 #include <signal.h>
 #include <errno.h>
 #include <assert.h>
 
-#include "libustd.h"
+#include <ust/ustd.h>
+#include "lowlevel.h"
 #include "usterr.h"
 #include "ustcomm.h"
 
-/* return value: 0 = subbuffer is finished, it won't produce data anymore
- *               1 = got subbuffer successfully
- *               <0 = error
- */
-
 #define GET_SUBBUF_OK 1
 #define GET_SUBBUF_DONE 0
 #define GET_SUBBUF_DIED 2
 
 #define UNIX_PATH_MAX 108
 
-int get_subbuffer(struct buffer_info *buf)
+static int get_subbuffer(struct buffer_info *buf)
 {
-       char *send_msg=NULL;
-       char *received_msg=NULL;
-       char *rep_code=NULL;
-       int retval;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, _recv_msg;
+       struct ustcomm_buffer_info *send_msg, *recv_msg;
        int result;
 
-       asprintf(&send_msg, "get_subbuffer %s", buf->name);
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
-               DBG("app died while being traced");
-               retval = GET_SUBBUF_DIED;
-               goto end;
-       }
-       else if(result < 0) {
-               ERR("get_subbuffer: ustcomm_send_request failed");
-               retval = -1;
-               goto end;
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
+
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
 
-       result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
-       if(result != 2 && result != 1) {
-               ERR("unable to parse response to get_subbuffer");
-               retval = -1;
-               free(received_msg);
-               goto end_rep;
+       send_hdr->command = GET_SUBBUFFER;
+
+       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
+                            recv_hdr, (char *)recv_msg);
+       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
+           result == 0) {
+               DBG("app died while being traced");
+               return GET_SUBBUF_DIED;
+       } else if (result < 0) {
+               ERR("get_subbuffer: ustcomm_req failed");
+               return result;
        }
 
-       if(!strcmp(rep_code, "OK")) {
+       if (!recv_hdr->result) {
                DBG("got subbuffer %s", buf->name);
-               retval = GET_SUBBUF_OK;
-       }
-       else if(nth_token_is(received_msg, "END", 0) == 1) {
-               retval = GET_SUBBUF_DONE;
-               goto end_rep;
-       }
-       else if(!strcmp(received_msg, "NOTFOUND")) {
-               DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
-               retval = GET_SUBBUF_DIED;
-               goto end_rep;
+               buf->consumed_old = recv_msg->consumed_old;
+               return GET_SUBBUF_OK;
+       } else if (recv_hdr->result == -ENODATA) {
+               DBG("For buffer %s, the trace was not found. This likely means"
+                   " it was destroyed by the user.", buf->name);
+               return GET_SUBBUF_DIED;
        }
-       else {
-               DBG("error getting subbuffer %s", buf->name);
-               retval = -1;
-       }
-
-       /* FIXME: free correctly the stuff */
-end_rep:
-       if(rep_code)
-               free(rep_code);
-end:
-       if(send_msg)
-               free(send_msg);
-       if(received_msg)
-               free(received_msg);
 
-       return retval;
+       DBG("error getting subbuffer %s", buf->name);
+       return recv_hdr->result;
 }
 
-int put_subbuffer(struct buffer_info *buf)
+static int put_subbuffer(struct buffer_info *buf)
 {
-       char *send_msg=NULL;
-       char *received_msg=NULL;
-       char *rep_code=NULL;
-       int retval;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
        int result;
 
-       asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
-               retval = PUT_SUBBUF_DIED;
-               goto end;
-       }
-       else if(result < 0) {
-               ERR("put_subbuffer: send_message failed");
-               retval = -1;
-               goto end;
-       }
-       else if(result == 0) {
-               /* Program seems finished. However this might not be
-                * the last subbuffer that has to be collected.
-                */
-               retval = PUT_SUBBUF_DIED;
-               goto end;
-       }
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
 
-       result = sscanf(received_msg, "%as", &rep_code);
-       if(result != 1) {
-               ERR("unable to parse response to put_subbuffer");
-               retval = -1;
-               goto end_rep;
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
 
-       if(!strcmp(rep_code, "OK")) {
-               DBG("subbuffer put %s", buf->name);
-               retval = PUT_SUBBUF_OK;
-       }
-       else if(!strcmp(received_msg, "NOTFOUND")) {
-               DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
-               /* However, maybe this was not the last subbuffer. So
-                * we return the program died.
-                */
-               retval = PUT_SUBBUF_DIED;
-               goto end_rep;
-       }
-       else {
-               DBG("put_subbuffer: received error, we were pushed");
-               retval = PUT_SUBBUF_PUSHED;
-               goto end_rep;
-       }
+       send_hdr->command = PUT_SUBBUFFER;
+       send_msg->consumed_old = buf->consumed_old;
 
-end_rep:
-       if(rep_code)
-               free(rep_code);
+       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
+                            recv_hdr, NULL);
+       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
+           result == 0) {
+               DBG("app died while being traced");
+               return PUT_SUBBUF_DIED;
+       } else if (result < 0) {
+               ERR("put_subbuffer: ustcomm_req failed");
+               return result;
+       }
 
-end:
-       if(send_msg)
-               free(send_msg);
-       if(received_msg)
-               free(received_msg);
+       if (!recv_hdr->result) {
+               DBG("put subbuffer %s", buf->name);
+               return PUT_SUBBUF_OK;
+       } else if (recv_hdr->result == -ENODATA) {
+               DBG("For buffer %s, the trace was not found. This likely means"
+                   " it was destroyed by the user.", buf->name);
+               return PUT_SUBBUF_DIED;
+       }
 
-       return retval;
+       DBG("error getting subbuffer %s", buf->name);
+       return recv_hdr->result;
 }
 
 void decrement_active_buffers(void *arg)
@@ -182,129 +146,219 @@ void decrement_active_buffers(void *arg)
        pthread_mutex_unlock(&instance->mutex);
 }
 
-struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname)
+static int get_pidunique(int sock, s64 *pidunique)
 {
-       struct buffer_info *buf;
-       char *send_msg;
-       char *received_msg;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_pidunique _recv_msg, *recv_msg;
        int result;
-       struct shmid_ds shmds;
 
-       buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
-       if(buf == NULL) {
-               ERR("add_buffer: insufficient memory");
-               return NULL;
-       }
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       recv_msg = &_recv_msg;
 
-       buf->conn = malloc(sizeof(struct ustcomm_connection));
-       if(buf->conn == NULL) {
-               ERR("add_buffer: insufficient memory");
-               free(buf);
-               return NULL;
+       memset(send_hdr, 0, sizeof(*send_hdr));
+
+       send_hdr->command = GET_PIDUNIQUE;
+       result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg);
+       if (result < 1) {
+               return -ENOTCONN;
+       }
+       if (recv_hdr->result < 0) {
+               ERR("App responded with error: %s", strerror(recv_hdr->result));
+               return recv_hdr->result;
        }
 
-       buf->name = bufname;
-       buf->pid = pid;
+       *pidunique = recv_msg->pidunique;
 
-       /* connect to app */
-       result = ustcomm_connect_app(buf->pid, buf->conn);
-       if(result) {
-               WARN("unable to connect to process, it probably died before we were able to connect");
-               return NULL;
+       return 0;
+}
+
+static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf,
+                                int *buf_shmid, int *buf_struct_shmid,
+                                int *buf_pipe_fd)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
+       struct ustcomm_buffer_info _recv_msg, *recv_msg;
+       int result, recv_pipe_fd;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
+
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               ERR("Failed to pack buffer info");
+               return result;
        }
 
-       /* get pidunique */
-       asprintf(&send_msg, "get_pidunique");
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_pidunique)");
-               return NULL;
+       send_hdr->command = GET_BUF_SHMID_PIPE_FD;
+
+       result = ustcomm_send(sock, send_hdr, (char *)send_msg);
+       if (result < 1) {
+               ERR("Failed to send request");
+               return -ENOTCONN;
+       }
+       result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd);
+       if (result < 1) {
+               ERR("Failed to receive message and fd");
+               return -ENOTCONN;
        }
-       if(result == 0) {
-               goto error;
+       if (recv_hdr->result < 0) {
+               ERR("App responded with error %s", strerror(recv_hdr->result));
+               return recv_hdr->result;
        }
 
-       result = sscanf(received_msg, "%lld", &buf->pidunique);
-       if(result != 1) {
-               ERR("unable to parse response to get_pidunique");
-               return NULL;
+       *buf_shmid = recv_msg->buf_shmid;
+       *buf_struct_shmid = recv_msg->buf_struct_shmid;
+       *buf_pipe_fd = recv_pipe_fd;
+
+       return 0;
+}
+
+static int get_subbuf_num_size(int sock, struct buffer_info *buf,
+                              int *subbuf_num, int *subbuf_size)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_channel_info _send_msg, *send_msg;
+       struct ustcomm_channel_info _recv_msg, *recv_msg;
+       int result;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
+
+       result = ustcomm_pack_channel_info(send_hdr, send_msg,
+                                          buf->channel);
+       if (result < 0) {
+               return result;
        }
-       free(received_msg);
-       DBG("got pidunique %lld", buf->pidunique);
 
-       /* get shmid */
-       asprintf(&send_msg, "get_shmid %s", buf->name);
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_shmid)");
-               return NULL;
+       send_hdr->command = GET_SUBBUF_NUM_SIZE;
+
+       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
+                            recv_hdr, (char *)recv_msg);
+       if (result < 1) {
+               return -ENOTCONN;
        }
-       if(result == 0) {
-               goto error;
+
+       *subbuf_num = recv_msg->subbuf_num;
+       *subbuf_size = recv_msg->subbuf_size;
+
+       return recv_hdr->result;
+}
+
+
+static int notify_buffer_mapped(int sock, struct buffer_info *buf)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
+       int result;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
 
-       result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
-       if(result != 2) {
-               ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
-               return NULL;
+       send_hdr->command = NOTIFY_BUF_MAPPED;
+
+       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
+                            recv_hdr, NULL);
+       if (result < 1) {
+               return -ENOTCONN;
        }
-       free(received_msg);
-       DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
 
-       /* get n_subbufs */
-       asprintf(&send_msg, "get_n_subbufs %s", buf->name);
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(g_n_subbufs)");
+       return recv_hdr->result;
+}
+
+
+struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
+                                  const char *channel, int channel_cpu)
+{
+       struct buffer_info *buf;
+       int result;
+       struct shmid_ds shmds;
+
+       buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
+       if(buf == NULL) {
+               ERR("add_buffer: insufficient memory");
                return NULL;
        }
-       if(result == 0) {
-               goto error;
+
+       buf->channel = strdup(channel);
+       if (!buf->channel) {
+               goto free_buf;
        }
 
-       result = sscanf(received_msg, "%d", &buf->n_subbufs);
-       if(result != 1) {
-               ERR("unable to parse response to get_n_subbufs");
-               return NULL;
+       result = asprintf(&buf->name, "%s_%d", channel, channel_cpu);
+       if (result < 0 || buf->name == NULL) {
+               goto free_buf_channel;
        }
-       free(received_msg);
-       DBG("got n_subbufs %d", buf->n_subbufs);
 
-       /* get subbuf size */
-       asprintf(&send_msg, "get_subbuf_size %s", buf->name);
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_subbuf_size)");
-               return NULL;
+       buf->channel_cpu = channel_cpu;
+       buf->pid = pid;
+
+       result = ustcomm_connect_app(buf->pid, &buf->app_sock);
+       if(result) {
+               WARN("unable to connect to process, it probably died before we were able to connect");
+               goto free_buf_name;
        }
-       if(result == 0) {
-               goto error;
+
+       /* get pidunique */
+       result = get_pidunique(buf->app_sock, &buf->pidunique);
+       if (result < 0) {
+               ERR("Failed to get pidunique");
+               goto close_app_sock;
+       }
+
+       /* get shmid and pipe fd */
+       result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid,
+                                      &buf->bufstruct_shmid, &buf->pipe_fd);
+       if (result < 0) {
+               ERR("Failed to get buf_shmid and pipe_fd");
+               goto close_app_sock;
+       } else {
+               struct stat temp;
+               fstat(buf->pipe_fd, &temp);
+               if (!S_ISFIFO(temp.st_mode)) {
+                       ERR("Didn't receive a fifo from the app");
+                       goto close_app_sock;
+               }
        }
 
-       result = sscanf(received_msg, "%d", &buf->subbuf_size);
-       if(result != 1) {
-               ERR("unable to parse response to get_subbuf_size");
-               return NULL;
+
+       /* get number of subbufs and subbuf size */
+       result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs,
+                                    &buf->subbuf_size);
+       if (result < 0) {
+               ERR("Failed to get subbuf number and size");
+               goto close_fifo;
        }
-       free(received_msg);
-       DBG("got subbuf_size %d", buf->subbuf_size);
 
        /* attach memory */
        buf->mem = shmat(buf->shmid, NULL, 0);
        if(buf->mem == (void *) 0) {
                PERROR("shmat");
-               return NULL;
+               goto close_fifo;
        }
        DBG("successfully attached buffer memory");
 
        buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
        if(buf->bufstruct_mem == (void *) 0) {
                PERROR("shmat");
-               return NULL;
+               goto shmdt_mem;
        }
        DBG("successfully attached buffer bufstruct memory");
 
@@ -312,10 +366,16 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
        result = shmctl(buf->shmid, IPC_STAT, &shmds);
        if(result == -1) {
                PERROR("shmctl");
-               return NULL;
+               goto shmdt_bufstruct_mem;
        }
        buf->memlen = shmds.shm_segsz;
 
+       /* Notify the application that we have mapped the buffer */
+       result = notify_buffer_mapped(buf->app_sock, buf);
+       if (result < 0) {
+               goto shmdt_bufstruct_mem;
+       }
+
        if(instance->callbacks->on_open_buffer)
                instance->callbacks->on_open_buffer(instance->callbacks, buf);
 
@@ -325,7 +385,25 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
 
        return buf;
 
-error:
+shmdt_bufstruct_mem:
+       shmdt(buf->bufstruct_mem);
+
+shmdt_mem:
+       shmdt(buf->mem);
+
+close_fifo:
+       close(buf->pipe_fd);
+
+close_app_sock:
+       close(buf->app_sock);
+
+free_buf_name:
+       free(buf->name);
+
+free_buf_channel:
+       free(buf->channel);
+
+free_buf:
        free(buf);
        return NULL;
 }
@@ -335,7 +413,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
 {
        int result;
 
-       result = ustcomm_close_app(buf->conn);
+       result = close(buf->app_sock);
        if(result == -1) {
                WARN("problem calling ustcomm_close_app");
        }
@@ -353,28 +431,31 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
        if(callbacks->on_close_buffer)
                callbacks->on_close_buffer(callbacks, buf);
 
-       free(buf->conn);
        free(buf);
 }
 
 int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
 {
-       int result;
+       int result, read_result;
+       char read_buf;
 
        pthread_cleanup_push(decrement_active_buffers, instance);
 
        for(;;) {
+               read_result = read(buf->pipe_fd, &read_buf, 1);
                /* get the subbuffer */
-               result = get_subbuffer(buf);
-               if(result == -1) {
-                       ERR("error getting subbuffer");
-                       continue;
-               }
-               else if(result == GET_SUBBUF_DONE) {
-                       /* this is done */
-                       break;
-               }
-               else if(result == GET_SUBBUF_DIED) {
+               if (read_result == 1) {
+                       result = get_subbuffer(buf);
+                       if (result < 0) {
+                               ERR("error getting subbuffer");
+                               continue;
+                       } else if (result == GET_SUBBUF_DIED) {
+                               finish_consuming_dead_subbuffer(instance->callbacks, buf);
+                               break;
+                       }
+               } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
+                          result == 0) {
+                       DBG("App died while being traced");
                        finish_consuming_dead_subbuffer(instance->callbacks, buf);
                        break;
                }
@@ -427,7 +508,8 @@ int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
 
 struct consumer_thread_args {
        pid_t pid;
-       const char *bufname;
+       const char *channel;
+       int channel_cpu;
        struct libustd_instance *instance;
 };
 
@@ -438,8 +520,6 @@ void *consumer_thread(void *arg)
        int result;
        sigset_t sigset;
 
-       DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
-
        if(args->instance->callbacks->on_new_thread)
                args->instance->callbacks->on_new_thread(args->instance->callbacks);
 
@@ -465,7 +545,8 @@ void *consumer_thread(void *arg)
                goto end;
        }
 
-       buf = connect_buffer(args->instance, args->pid, args->bufname);
+       buf = connect_buffer(args->instance, args->pid,
+                            args->channel, args->channel_cpu);
        if(buf == NULL) {
                ERR("failed to connect to buffer");
                goto end;
@@ -480,26 +561,32 @@ void *consumer_thread(void *arg)
        if(args->instance->callbacks->on_close_thread)
                args->instance->callbacks->on_close_thread(args->instance->callbacks);
 
-       free((void *)args->bufname);
+       free((void *)args->channel);
        free(args);
        return NULL;
 }
 
-int start_consuming_buffer(
-       struct libustd_instance *instance, pid_t pid, const char *bufname)
+int start_consuming_buffer(struct libustd_instance *instance, pid_t pid,
+                          const char *channel, int channel_cpu)
 {
        pthread_t thr;
        struct consumer_thread_args *args;
        int result;
 
-       DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
+       DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel,
+           channel_cpu);
 
-       args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args));
+       args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args));
+       if (!args) {
+               return -ENOMEM;
+       }
 
        args->pid = pid;
-       args->bufname = strdup(bufname);
+       args->channel = strdup(channel);
+       args->channel_cpu = channel_cpu;
        args->instance = instance;
-       DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+       DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s_%d",
+           args->pid, args->channel, args->channel_cpu);
 
        result = pthread_create(&thr, NULL, consumer_thread, args);
        if(result == -1) {
@@ -511,68 +598,115 @@ int start_consuming_buffer(
                ERR("pthread_detach failed");
                return -1;
        }
-       DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+       DBG("end of start_consuming_buffer: args: pid %d bufname %s_%d",
+           args->pid, args->channel, args->channel_cpu);
 
        return 0;
 }
+static void process_client_cmd(int sock, struct ustcomm_header *req_header,
+                              char *recvbuf, struct libustd_instance *instance)
+{
+       int result;
+       struct ustcomm_header _res_header;
+       struct ustcomm_header *res_header = &_res_header;
+       struct ustcomm_buffer_info *buf_inf;
+
+       DBG("Processing client command");
+
+       switch (req_header->command) {
+       case CONSUME_BUFFER:
+
+               buf_inf = (struct ustcomm_buffer_info *)recvbuf;
+               result = ustcomm_unpack_buffer_info(buf_inf);
+               if (result < 0) {
+                       ERR("Couldn't unpack buffer info");
+                       return;
+               }
+
+               DBG("Going to consume buffer %s_%d in process %d",
+                   buf_inf->channel, buf_inf->ch_cpu, buf_inf->pid);
+               result = start_consuming_buffer(instance, buf_inf->pid,
+                                               buf_inf->channel,
+                                               buf_inf->ch_cpu);
+               if (result < 0) {
+                       ERR("error in add_buffer");
+                       return;
+               }
+
+               res_header->result = 0;
+               break;
+       case EXIT:
+               res_header->result = 0;
+               /* Only there to force poll to return */
+               break;
+       default:
+               res_header->result = -EINVAL;
+               WARN("unknown command: %d", req_header->command);
+       }
+
+       if (ustcomm_send(sock, res_header, NULL) <= 0) {
+               ERR("couldn't send command response");
+       }
+}
+
+#define MAX_EVENTS 10
 
 int libustd_start_instance(struct libustd_instance *instance)
 {
-       int result;
-       int timeout = -1;
+       struct ustcomm_header recv_hdr;
+       char recv_buf[USTCOMM_BUFFER_SIZE];
+       struct ustcomm_sock *epoll_sock;
+       struct epoll_event events[MAX_EVENTS];
+       struct sockaddr addr;
+       int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
 
        if(!instance->is_init) {
                ERR("libustd instance not initialized");
                return 1;
        }
+       epoll_fd = instance->epoll_fd;
+
+       timeout = -1;
 
        /* app loop */
        for(;;) {
-               char *recvbuf;
-
-               /* check for requests on our public socket */
-               result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout);
-               if(result == -1 && errno == EINTR) {
+               nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
+               if (nfds == -1 && errno == EINTR) {
                        /* Caught signal */
+               } else if (nfds == -1) {
+                       PERROR("libustd_start_instance: epoll_wait failed");
+                       continue;
                }
-               else if(result == -1) {
-                       ERR("error in ustcomm_ustd_recv_message");
-                       goto loop_end;
-               }
-               else if(result > 0) {
-                       if(!strncmp(recvbuf, "collect", 7)) {
-                               pid_t pid;
-                               char *bufname;
-                               int result;
-
-                               result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
-                               if(result != 2) {
-                                       ERR("parsing error: %s", recvbuf);
-                                       goto free_bufname;
-                               }
 
-                               result = start_consuming_buffer(instance, pid, bufname);
-                               if(result < 0) {
-                                       ERR("error in add_buffer");
-                                       goto free_bufname;
+               for (i = 0; i < nfds; ++i) {
+                       epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+                       if (epoll_sock == instance->listen_sock) {
+                               addr_size = sizeof(struct sockaddr);
+                               accept_fd = accept(epoll_sock->fd,
+                                                  &addr,
+                                                  (socklen_t *)&addr_size);
+                               if (accept_fd == -1) {
+                                       PERROR("libustd_start_instance: "
+                                              "accept failed");
+                                       continue;
+                               }
+                               ustcomm_init_sock(accept_fd, epoll_fd,
+                                                &instance->connections);
+                       } else {
+                               result = ustcomm_recv(epoll_sock->fd, &recv_hdr,
+                                                     recv_buf);
+                               if (result < 1) {
+                                       ustcomm_del_sock(epoll_sock, 0);
+                               } else {
+                                       process_client_cmd(epoll_sock->fd,
+                                                          &recv_hdr, recv_buf,
+                                                          instance);
                                }
 
-                               free_bufname:
-                               free(bufname);
-                       }
-                       else if(!strncmp(recvbuf, "exit", 4)) {
-                               /* Only there to force poll to return */
                        }
-                       else {
-                               WARN("unknown command: %s", recvbuf);
-                       }
-
-                       free(recvbuf);
                }
 
-               loop_end:
-
-               if(instance->quit_program) {
+               if (instance->quit_program) {
                        pthread_mutex_lock(&instance->mutex);
                        if(instance->active_buffers == 0) {
                                pthread_mutex_unlock(&instance->mutex);
@@ -591,17 +725,22 @@ int libustd_start_instance(struct libustd_instance *instance)
        return 0;
 }
 
+/* FIXME: threads and connections !? */
 void libustd_delete_instance(struct libustd_instance *instance)
 {
-       if(instance->is_init)
-               ustcomm_fini_ustd(instance->comm);
+       if (instance->is_init) {
+               ustcomm_del_named_sock(instance->listen_sock, 0);
+               close(instance->epoll_fd);
+       }
 
        pthread_mutex_destroy(&instance->mutex);
        free(instance->sock_path);
-       free(instance->comm);
        free(instance);
 }
 
+/* FIXME: Do something about the fixed path length, maybe get rid
+ * of the whole concept and use a pipe?
+ */
 int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
 {
        int result;
@@ -643,17 +782,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
        return 0;
 }
 
-struct libustd_instance *libustd_new_instance(
-       struct libustd_callbacks *callbacks, char *sock_path)
+struct libustd_instance
+*libustd_new_instance(struct libustd_callbacks *callbacks,
+                     char *sock_path)
 {
        struct libustd_instance *instance =
-               malloc(sizeof(struct libustd_instance));
-       if(!instance)
-               return NULL;
-
-       instance->comm = malloc(sizeof(struct ustcomm_ustd));
-       if(!instance->comm) {
-               free(instance);
+               zmalloc(sizeof(struct libustd_instance));
+       if(!instance) {
                return NULL;
        }
 
@@ -663,18 +798,75 @@ struct libustd_instance *libustd_new_instance(
        instance->active_buffers = 0;
        pthread_mutex_init(&instance->mutex, NULL);
 
-       if(sock_path)
+       if (sock_path) {
                instance->sock_path = strdup(sock_path);
-       else
+       } else {
                instance->sock_path = NULL;
+       }
 
        return instance;
 }
 
+static int init_ustd_socket(struct libustd_instance *instance)
+{
+       char *name;
+
+       if (instance->sock_path) {
+               if (asprintf(&name, "%s", instance->sock_path) < 0) {
+                       ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
+                           instance->sock_path);
+                       return -1;
+               }
+       } else {
+               int result;
+
+               /* Only check if socket dir exists if we are using the default directory */
+               result = ensure_dir_exists(SOCK_DIR);
+               if (result == -1) {
+                       ERR("Unable to create socket directory %s", SOCK_DIR);
+                       return -1;
+               }
+
+               if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
+                       ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
+                           SOCK_DIR);
+                       return -1;
+               }
+       }
+
+       /* Set up epoll */
+       instance->epoll_fd = epoll_create(MAX_EVENTS);
+       if (instance->epoll_fd == -1) {
+               ERR("epoll_create failed, start instance bailing");
+               goto free_name;
+       }
+
+       /* Create the named socket */
+       instance->listen_sock = ustcomm_init_named_socket(name,
+                                                         instance->epoll_fd);
+       if(!instance->listen_sock) {
+               ERR("error initializing named socket at %s", name);
+               goto close_epoll;
+       }
+
+       INIT_LIST_HEAD(&instance->connections);
+
+       free(name);
+
+       return 0;
+
+close_epoll:
+       close(instance->epoll_fd);
+free_name:
+       free(name);
+
+       return -1;
+}
+
 int libustd_init_instance(struct libustd_instance *instance)
 {
        int result;
-       result = ustcomm_init_ustd(instance->comm, instance->sock_path);
+       result = init_ustd_socket(instance);
        if(result == -1) {
                ERR("failed to initialize socket");
                return 1;
This page took 0.038895 seconds and 4 git commands to generate.