Introduce a new communication protocol for UST v5
[ust.git] / libustd / libustd.c
index 5cc210806d80203cb5acc8525f19124a101d8a1b..6e7b0cd5c145024e8a73a210508bdf5e9c63a0e8 100644 (file)
 #include "usterr.h"
 #include "ustcomm.h"
 
-/* return value: 0 = subbuffer is finished, it won't produce data anymore
- *               1 = got subbuffer successfully
- *               <0 = error
- */
-
 #define GET_SUBBUF_OK 1
 #define GET_SUBBUF_DONE 0
 #define GET_SUBBUF_DIED 2
 
 #define UNIX_PATH_MAX 108
 
-int get_subbuffer(struct buffer_info *buf)
+static int get_subbuffer(struct buffer_info *buf)
 {
-       char *send_msg=NULL;
-       char *received_msg=NULL;
-       char *rep_code=NULL;
-       int retval;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, _recv_msg;
+       struct ustcomm_buffer_info *send_msg, *recv_msg;
        int result;
 
-       if (asprintf(&send_msg, "get_subbuffer %s", buf->name) < 0) {
-               ERR("get_subbuffer : asprintf failed (%s)",
-                   buf->name);
-               retval = -1;
-               goto end;
-       }
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
 
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
-               DBG("app died while being traced");
-               retval = GET_SUBBUF_DIED;
-               goto end;
-       }
-       else if(result < 0) {
-               ERR("get_subbuffer: ustcomm_send_request failed");
-               retval = -1;
-               goto end;
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
 
-       result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
-       if(result != 2 && result != 1) {
-               ERR("unable to parse response to get_subbuffer");
-               retval = -1;
-               free(received_msg);
-               goto end_rep;
+       send_hdr->command = GET_SUBBUFFER;
+
+       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
+                            recv_hdr, (char *)recv_msg);
+       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
+           result == 0) {
+               DBG("app died while being traced");
+               return GET_SUBBUF_DIED;
+       } else if (result < 0) {
+               ERR("get_subbuffer: ustcomm_req failed");
+               return result;
        }
 
-       if (!strcmp(rep_code, "OK")) {
+       if (!recv_hdr->result) {
                DBG("got subbuffer %s", buf->name);
-               retval = GET_SUBBUF_OK;
-       } else if(!strcmp(received_msg, "NOTFOUND")) {
-               DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
-               retval = GET_SUBBUF_DIED;
-               goto end_rep;
-       } else {
-               DBG("error getting subbuffer %s", buf->name);
-               retval = -1;
+               buf->consumed_old = recv_msg->consumed_old;
+               return GET_SUBBUF_OK;
+       } else if (recv_hdr->result == -ENODATA) {
+               DBG("For buffer %s, the trace was not found. This likely means"
+                   " it was destroyed by the user.", buf->name);
+               return GET_SUBBUF_DIED;
        }
 
-       /* FIXME: free correctly the stuff */
-end_rep:
-       if(rep_code)
-               free(rep_code);
-end:
-       if(send_msg)
-               free(send_msg);
-       if(received_msg)
-               free(received_msg);
-
-       return retval;
+       DBG("error getting subbuffer %s", buf->name);
+       return recv_hdr->result;
 }
 
-int put_subbuffer(struct buffer_info *buf)
+static int put_subbuffer(struct buffer_info *buf)
 {
-       char *send_msg=NULL;
-       char *received_msg=NULL;
-       char *rep_code=NULL;
-       int retval;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
        int result;
 
-       if (asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old) < 0) {
-               ERR("put_subbuffer : asprintf failed (%s %ld)",
-                   buf->name, buf->consumed_old);
-               retval = -1;
-               goto end;
-       }
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
-               retval = PUT_SUBBUF_DIED;
-               goto end;
-       }
-       else if(result < 0) {
-               ERR("put_subbuffer: send_message failed");
-               retval = -1;
-               goto end;
-       }
-       else if(result == 0) {
-               /* Program seems finished. However this might not be
-                * the last subbuffer that has to be collected.
-                */
-               retval = PUT_SUBBUF_DIED;
-               goto end;
-       }
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
 
-       result = sscanf(received_msg, "%as", &rep_code);
-       if(result != 1) {
-               ERR("unable to parse response to put_subbuffer");
-               retval = -1;
-               goto end_rep;
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
 
-       if(!strcmp(rep_code, "OK")) {
-               DBG("subbuffer put %s", buf->name);
-               retval = PUT_SUBBUF_OK;
-       }
-       else if(!strcmp(received_msg, "NOTFOUND")) {
-               DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
-               /* However, maybe this was not the last subbuffer. So
-                * we return the program died.
-                */
-               retval = PUT_SUBBUF_DIED;
-               goto end_rep;
-       }
-       else {
-               DBG("put_subbuffer: received error, we were pushed");
-               retval = PUT_SUBBUF_PUSHED;
-               goto end_rep;
-       }
+       send_hdr->command = PUT_SUBBUFFER;
+       send_msg->consumed_old = buf->consumed_old;
 
-end_rep:
-       if(rep_code)
-               free(rep_code);
+       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
+                            recv_hdr, NULL);
+       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
+           result == 0) {
+               DBG("app died while being traced");
+               return PUT_SUBBUF_DIED;
+       } else if (result < 0) {
+               ERR("put_subbuffer: ustcomm_req failed");
+               return result;
+       }
 
-end:
-       if(send_msg)
-               free(send_msg);
-       if(received_msg)
-               free(received_msg);
+       if (!recv_hdr->result) {
+               DBG("put subbuffer %s", buf->name);
+               return PUT_SUBBUF_OK;
+       } else if (recv_hdr->result == -ENODATA) {
+               DBG("For buffer %s, the trace was not found. This likely means"
+                   " it was destroyed by the user.", buf->name);
+               return PUT_SUBBUF_DIED;
+       }
 
-       return retval;
+       DBG("error getting subbuffer %s", buf->name);
+       return recv_hdr->result;
 }
 
 void decrement_active_buffers(void *arg)
@@ -191,139 +146,219 @@ void decrement_active_buffers(void *arg)
        pthread_mutex_unlock(&instance->mutex);
 }
 
-struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname)
+static int get_pidunique(int sock, s64 *pidunique)
 {
-       struct buffer_info *buf;
-       char *send_msg;
-       char *received_msg;
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_pidunique _recv_msg, *recv_msg;
        int result;
-       struct shmid_ds shmds;
-       struct ustcomm_header header;
 
-       buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
-       if(buf == NULL) {
-               ERR("add_buffer: insufficient memory");
-               return NULL;
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       recv_msg = &_recv_msg;
+
+       memset(send_hdr, 0, sizeof(*send_hdr));
+
+       send_hdr->command = GET_PIDUNIQUE;
+       result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg);
+       if (result < 1) {
+               return -ENOTCONN;
+       }
+       if (recv_hdr->result < 0) {
+               ERR("App responded with error: %s", strerror(recv_hdr->result));
+               return recv_hdr->result;
        }
 
-       buf->name = bufname;
-       buf->pid = pid;
+       *pidunique = recv_msg->pidunique;
 
-       /* FIXME: Fix all the freeing and exit sequence from this functions */
-       /* connect to app */
-       result = ustcomm_connect_app(buf->pid, &buf->app_sock);
-       if(result) {
-               WARN("unable to connect to process, it probably died before we were able to connect");
-               return NULL;
+       return 0;
+}
+
+static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf,
+                                int *buf_shmid, int *buf_struct_shmid,
+                                int *buf_pipe_fd)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
+       struct ustcomm_buffer_info _recv_msg, *recv_msg;
+       int result, recv_pipe_fd;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
+
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               ERR("Failed to pack buffer info");
+               return result;
        }
 
-       /* get pidunique */
-       if (asprintf(&send_msg, "get_pidunique") < 0) {
-               ERR("connect_buffer : asprintf failed (get_pidunique)");
-               return NULL;
+       send_hdr->command = GET_BUF_SHMID_PIPE_FD;
+
+       result = ustcomm_send(sock, send_hdr, (char *)send_msg);
+       if (result < 1) {
+               ERR("Failed to send request");
+               return -ENOTCONN;
        }
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_pidunique)");
-               return NULL;
+       result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd);
+       if (result < 1) {
+               ERR("Failed to receive message and fd");
+               return -ENOTCONN;
        }
-       if(result == 0) {
-               goto error;
+       if (recv_hdr->result < 0) {
+               ERR("App responded with error %s", strerror(recv_hdr->result));
+               return recv_hdr->result;
        }
 
-       result = sscanf(received_msg, "%lld", &buf->pidunique);
-       if(result != 1) {
-               ERR("unable to parse response to get_pidunique");
-               return NULL;
-       }
-       free(received_msg);
-       DBG("got pidunique %lld", buf->pidunique);
+       *buf_shmid = recv_msg->buf_shmid;
+       *buf_struct_shmid = recv_msg->buf_struct_shmid;
+       *buf_pipe_fd = recv_pipe_fd;
 
-       /* get shmid */
-       if (asprintf(&send_msg, "get_shmid %s", buf->name) < 0) {
-               ERR("connect_buffer : asprintf failed (get_schmid %s)",
-                   buf->name);
-               return NULL;
-       }
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_shmid)");
-               return NULL;
+       return 0;
+}
+
+static int get_subbuf_num_size(int sock, struct buffer_info *buf,
+                              int *subbuf_num, int *subbuf_size)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_channel_info _send_msg, *send_msg;
+       struct ustcomm_channel_info _recv_msg, *recv_msg;
+       int result;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+       recv_msg = &_recv_msg;
+
+       result = ustcomm_pack_channel_info(send_hdr, send_msg,
+                                          buf->channel);
+       if (result < 0) {
+               return result;
        }
-       if(result == 0) {
-               goto error;
+
+       send_hdr->command = GET_SUBBUF_NUM_SIZE;
+
+       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
+                            recv_hdr, (char *)recv_msg);
+       if (result < 1) {
+               return -ENOTCONN;
        }
 
-       result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
-       if(result != 2) {
-               ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
-               return NULL;
+       *subbuf_num = recv_msg->subbuf_num;
+       *subbuf_size = recv_msg->subbuf_size;
+
+       return recv_hdr->result;
+}
+
+
+static int notify_buffer_mapped(int sock, struct buffer_info *buf)
+{
+       struct ustcomm_header _send_hdr, *send_hdr;
+       struct ustcomm_header _recv_hdr, *recv_hdr;
+       struct ustcomm_buffer_info _send_msg, *send_msg;
+       int result;
+
+       send_hdr = &_send_hdr;
+       recv_hdr = &_recv_hdr;
+       send_msg = &_send_msg;
+
+       result = ustcomm_pack_buffer_info(send_hdr, send_msg,
+                                         buf->channel, buf->channel_cpu);
+       if (result < 0) {
+               return result;
        }
-       free(received_msg);
-       DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
 
-       /* get n_subbufs */
-       if (asprintf(&send_msg, "get_n_subbufs %s", buf->name) < 0) {
-               ERR("connect_buffer : asprintf failed (get_n_subbufs %s)",
-                   buf->name);
-               return NULL;
+       send_hdr->command = NOTIFY_BUF_MAPPED;
+
+       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
+                            recv_hdr, NULL);
+       if (result < 1) {
+               return -ENOTCONN;
        }
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(g_n_subbufs)");
+
+       return recv_hdr->result;
+}
+
+
+struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
+                                  const char *channel, int channel_cpu)
+{
+       struct buffer_info *buf;
+       int result;
+       struct shmid_ds shmds;
+
+       buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
+       if(buf == NULL) {
+               ERR("add_buffer: insufficient memory");
                return NULL;
        }
-       if(result == 0) {
-               goto error;
-       }
 
-       result = sscanf(received_msg, "%d", &buf->n_subbufs);
-       if(result != 1) {
-               ERR("unable to parse response to get_n_subbufs");
-               return NULL;
+       buf->channel = strdup(channel);
+       if (!buf->channel) {
+               goto free_buf;
        }
-       free(received_msg);
-       DBG("got n_subbufs %d", buf->n_subbufs);
 
-       /* get subbuf size */
-       if (asprintf(&send_msg, "get_subbuf_size %s", buf->name) < 0) {
-               ERR("connect_buffer : asprintf failed (get_subbuf_size %s)",
-                   buf->name);
-               return NULL;
+       result = asprintf(&buf->name, "%s_%d", channel, channel_cpu);
+       if (result < 0 || buf->name == NULL) {
+               goto free_buf_channel;
        }
-       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
-       free(send_msg);
-       if(result == -1) {
-               ERR("problem in ustcomm_send_request(get_subbuf_size)");
-               return NULL;
+
+       buf->channel_cpu = channel_cpu;
+       buf->pid = pid;
+
+       result = ustcomm_connect_app(buf->pid, &buf->app_sock);
+       if(result) {
+               WARN("unable to connect to process, it probably died before we were able to connect");
+               goto free_buf_name;
        }
-       if(result == 0) {
-               goto error;
+
+       /* get pidunique */
+       result = get_pidunique(buf->app_sock, &buf->pidunique);
+       if (result < 0) {
+               ERR("Failed to get pidunique");
+               goto close_app_sock;
+       }
+
+       /* get shmid and pipe fd */
+       result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid,
+                                      &buf->bufstruct_shmid, &buf->pipe_fd);
+       if (result < 0) {
+               ERR("Failed to get buf_shmid and pipe_fd");
+               goto close_app_sock;
+       } else {
+               struct stat temp;
+               fstat(buf->pipe_fd, &temp);
+               if (!S_ISFIFO(temp.st_mode)) {
+                       ERR("Didn't receive a fifo from the app");
+                       goto close_app_sock;
+               }
        }
 
-       result = sscanf(received_msg, "%d", &buf->subbuf_size);
-       if(result != 1) {
-               ERR("unable to parse response to get_subbuf_size");
-               return NULL;
+
+       /* get number of subbufs and subbuf size */
+       result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs,
+                                    &buf->subbuf_size);
+       if (result < 0) {
+               ERR("Failed to get subbuf number and size");
+               goto close_fifo;
        }
-       free(received_msg);
-       DBG("got subbuf_size %d", buf->subbuf_size);
 
        /* attach memory */
        buf->mem = shmat(buf->shmid, NULL, 0);
        if(buf->mem == (void *) 0) {
                PERROR("shmat");
-               return NULL;
+               goto close_fifo;
        }
        DBG("successfully attached buffer memory");
 
        buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
        if(buf->bufstruct_mem == (void *) 0) {
                PERROR("shmat");
-               return NULL;
+               goto shmdt_mem;
        }
        DBG("successfully attached buffer bufstruct memory");
 
@@ -331,36 +366,16 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
        result = shmctl(buf->shmid, IPC_STAT, &shmds);
        if(result == -1) {
                PERROR("shmctl");
-               return NULL;
+               goto shmdt_bufstruct_mem;
        }
        buf->memlen = shmds.shm_segsz;
 
-       /* get buffer pipe fd */
-       memset(&header, 0, sizeof(header));
-       if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) {
-               ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
-                   buf->name);
-               return NULL;
-       }
-       header.size = strlen(send_msg) + 1;
-       result = ustcomm_send(buf->app_sock, &header, send_msg);
-       free(send_msg);
-       if (result <= 0) {
-               ERR("ustcomm_send failed.");
-               return NULL;
-       }
-       result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd);
-       if (result <= 0) {
-               ERR("ustcomm_recv_fd failed");
-               return NULL;
-       } else {
-               struct stat temp;
-               fstat(buf->pipe_fd, &temp);
-               if (!S_ISFIFO(temp.st_mode)) {
-                       ERR("Didn't receive a fifo from the app");
-                       return NULL;
-               }
+       /* Notify the application that we have mapped the buffer */
+       result = notify_buffer_mapped(buf->app_sock, buf);
+       if (result < 0) {
+               goto shmdt_bufstruct_mem;
        }
+
        if(instance->callbacks->on_open_buffer)
                instance->callbacks->on_open_buffer(instance->callbacks, buf);
 
@@ -370,7 +385,25 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
 
        return buf;
 
-error:
+shmdt_bufstruct_mem:
+       shmdt(buf->bufstruct_mem);
+
+shmdt_mem:
+       shmdt(buf->mem);
+
+close_fifo:
+       close(buf->pipe_fd);
+
+close_app_sock:
+       close(buf->app_sock);
+
+free_buf_name:
+       free(buf->name);
+
+free_buf_channel:
+       free(buf->channel);
+
+free_buf:
        free(buf);
        return NULL;
 }
@@ -413,7 +446,7 @@ int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
                /* get the subbuffer */
                if (read_result == 1) {
                        result = get_subbuffer(buf);
-                       if(result == -1) {
+                       if (result < 0) {
                                ERR("error getting subbuffer");
                                continue;
                        } else if (result == GET_SUBBUF_DIED) {
@@ -475,7 +508,8 @@ int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
 
 struct consumer_thread_args {
        pid_t pid;
-       const char *bufname;
+       const char *channel;
+       int channel_cpu;
        struct libustd_instance *instance;
 };
 
@@ -486,8 +520,6 @@ void *consumer_thread(void *arg)
        int result;
        sigset_t sigset;
 
-       DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
-
        if(args->instance->callbacks->on_new_thread)
                args->instance->callbacks->on_new_thread(args->instance->callbacks);
 
@@ -513,7 +545,8 @@ void *consumer_thread(void *arg)
                goto end;
        }
 
-       buf = connect_buffer(args->instance, args->pid, args->bufname);
+       buf = connect_buffer(args->instance, args->pid,
+                            args->channel, args->channel_cpu);
        if(buf == NULL) {
                ERR("failed to connect to buffer");
                goto end;
@@ -528,26 +561,32 @@ void *consumer_thread(void *arg)
        if(args->instance->callbacks->on_close_thread)
                args->instance->callbacks->on_close_thread(args->instance->callbacks);
 
-       free((void *)args->bufname);
+       free((void *)args->channel);
        free(args);
        return NULL;
 }
 
-int start_consuming_buffer(
-       struct libustd_instance *instance, pid_t pid, const char *bufname)
+int start_consuming_buffer(struct libustd_instance *instance, pid_t pid,
+                          const char *channel, int channel_cpu)
 {
        pthread_t thr;
        struct consumer_thread_args *args;
        int result;
 
-       DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
+       DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel,
+           channel_cpu);
 
        args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args));
+       if (!args) {
+               return -ENOMEM;
+       }
 
        args->pid = pid;
-       args->bufname = strdup(bufname);
+       args->channel = strdup(channel);
+       args->channel_cpu = channel_cpu;
        args->instance = instance;
-       DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+       DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s_%d",
+           args->pid, args->channel, args->channel_cpu);
 
        result = pthread_create(&thr, NULL, consumer_thread, args);
        if(result == -1) {
@@ -559,37 +598,54 @@ int start_consuming_buffer(
                ERR("pthread_detach failed");
                return -1;
        }
-       DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+       DBG("end of start_consuming_buffer: args: pid %d bufname %s_%d",
+           args->pid, args->channel, args->channel_cpu);
 
        return 0;
 }
-static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
+static void process_client_cmd(int sock, struct ustcomm_header *req_header,
+                              char *recvbuf, struct libustd_instance *instance)
 {
-       if(!strncmp(recvbuf, "collect", 7)) {
-               pid_t pid;
-               char *bufname = NULL;
-               int result;
+       int result;
+       struct ustcomm_header _res_header;
+       struct ustcomm_header *res_header = &_res_header;
+       struct ustcomm_buffer_info *buf_inf;
+
+       DBG("Processing client command");
+
+       switch (req_header->command) {
+       case CONSUME_BUFFER:
 
-               result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
-               if (result != 2) {
-                       ERR("parsing error: %s", recvbuf);
-                       goto free_bufname;
+               buf_inf = (struct ustcomm_buffer_info *)recvbuf;
+               result = ustcomm_unpack_buffer_info(buf_inf);
+               if (result < 0) {
+                       ERR("Couldn't unpack buffer info");
+                       return;
                }
 
-               result = start_consuming_buffer(instance, pid, bufname);
+               DBG("Going to consume buffer %s_%d in process %d",
+                   buf_inf->channel, buf_inf->ch_cpu, buf_inf->pid);
+               result = start_consuming_buffer(instance, buf_inf->pid,
+                                               buf_inf->channel,
+                                               buf_inf->ch_cpu);
                if (result < 0) {
                        ERR("error in add_buffer");
-                       goto free_bufname;
+                       return;
                }
 
-       free_bufname:
-               if (bufname) {
-                       free(bufname);
-               }
-       } else if(!strncmp(recvbuf, "exit", 4)) {
+               res_header->result = 0;
+               break;
+       case EXIT:
+               res_header->result = 0;
                /* Only there to force poll to return */
-       } else {
-               WARN("unknown command: %s", recvbuf);
+               break;
+       default:
+               res_header->result = -EINVAL;
+               WARN("unknown command: %d", req_header->command);
+       }
+
+       if (ustcomm_send(sock, res_header, NULL) <= 0) {
+               ERR("couldn't send command response");
        }
 }
 
@@ -597,6 +653,8 @@ static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
 
 int libustd_start_instance(struct libustd_instance *instance)
 {
+       struct ustcomm_header recv_hdr;
+       char recv_buf[USTCOMM_BUFFER_SIZE];
        struct ustcomm_sock *epoll_sock;
        struct epoll_event events[MAX_EVENTS];
        struct sockaddr addr;
@@ -635,13 +693,14 @@ int libustd_start_instance(struct libustd_instance *instance)
                                ustcomm_init_sock(accept_fd, epoll_fd,
                                                 &instance->connections);
                        } else {
-                               char *msg = NULL;
-                               result = recv_message_conn(epoll_sock->fd, &msg);
-                               if (result == 0) {
+                               result = ustcomm_recv(epoll_sock->fd, &recv_hdr,
+                                                     recv_buf);
+                               if (result < 1) {
                                        ustcomm_del_sock(epoll_sock, 0);
-                               } else if (msg) {
-                                       process_client_cmd(msg, instance);
-                                       free(msg);
+                               } else {
+                                       process_client_cmd(epoll_sock->fd,
+                                                          &recv_hdr, recv_buf,
+                                                          instance);
                                }
 
                        }
@@ -679,6 +738,9 @@ void libustd_delete_instance(struct libustd_instance *instance)
        free(instance);
 }
 
+/* FIXME: Do something about the fixed path length, maybe get rid
+ * of the whole concept and use a pipe?
+ */
 int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
 {
        int result;
This page took 0.032712 seconds and 4 git commands to generate.