Move away files that will be deprecated
[lttng-ust.git] / libustconsumer / libustconsumer.c
diff --git a/libustconsumer/libustconsumer.c b/libustconsumer/libustconsumer.c
deleted file mode 100644 (file)
index c6dd20c..0000000
+++ /dev/null
@@ -1,923 +0,0 @@
-/* Copyright (C) 2009  Pierre-Marc Fournier
- *               2010  Alexis Halle
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
- */
-
-#define _GNU_SOURCE
-
-#include <sys/epoll.h>
-#include <sys/shm.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <pthread.h>
-#include <signal.h>
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#include <ust/ustconsumer.h>
-#include "lowlevel.h"
-#include "usterr_signal_safe.h"
-#include "ustcomm.h"
-
-#define GET_SUBBUF_OK 1
-#define GET_SUBBUF_DONE 0
-#define GET_SUBBUF_DIED 2
-
-#define PUT_SUBBUF_OK 1
-#define PUT_SUBBUF_DIED 0
-#define PUT_SUBBUF_PUSHED 2
-#define PUT_SUBBUF_DONE 3
-
-#define UNIX_PATH_MAX 108
-
-static int get_subbuffer(struct buffer_info *buf)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_buffer_info _send_msg, _recv_msg;
-       struct ustcomm_buffer_info *send_msg, *recv_msg;
-       int result;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       send_msg = &_send_msg;
-       recv_msg = &_recv_msg;
-
-       result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
-                                         buf->channel, buf->channel_cpu);
-       if (result < 0) {
-               return result;
-       }
-
-       send_hdr->command = GET_SUBBUFFER;
-
-       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
-                            recv_hdr, (char *)recv_msg);
-       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
-           result == 0) {
-               DBG("app died while being traced");
-               return GET_SUBBUF_DIED;
-       } else if (result < 0) {
-               ERR("get_subbuffer: ustcomm_req failed");
-               return result;
-       }
-
-       if (!recv_hdr->result) {
-               DBG("got subbuffer %s", buf->name);
-               buf->consumed_old = recv_msg->consumed_old;
-               return GET_SUBBUF_OK;
-       } else if (recv_hdr->result == -ENODATA) {
-               DBG("For buffer %s, the trace was not found. This likely means"
-                   " it was destroyed by the user.", buf->name);
-               return GET_SUBBUF_DIED;
-       }
-
-       DBG("error getting subbuffer %s", buf->name);
-       return recv_hdr->result;
-}
-
-static int put_subbuffer(struct buffer_info *buf)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_buffer_info _send_msg, *send_msg;
-       int result;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       send_msg = &_send_msg;
-
-       result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
-                                         buf->channel, buf->channel_cpu);
-       if (result < 0) {
-               return result;
-       }
-
-       send_hdr->command = PUT_SUBBUFFER;
-       send_msg->consumed_old = buf->consumed_old;
-
-       result = ustcomm_req(buf->app_sock, send_hdr, (char *)send_msg,
-                            recv_hdr, NULL);
-       if ((result < 0 && (errno == ECONNRESET || errno == EPIPE)) ||
-           result == 0) {
-               DBG("app died while being traced");
-               return PUT_SUBBUF_DIED;
-       } else if (result < 0) {
-               ERR("put_subbuffer: ustcomm_req failed");
-               return result;
-       }
-
-       if (!recv_hdr->result) {
-               DBG("put subbuffer %s", buf->name);
-               return PUT_SUBBUF_OK;
-       } else if (recv_hdr->result == -ENODATA) {
-               DBG("For buffer %s, the trace was not found. This likely means"
-                   " it was destroyed by the user.", buf->name);
-               return PUT_SUBBUF_DIED;
-       }
-
-       DBG("error getting subbuffer %s", buf->name);
-       return recv_hdr->result;
-}
-
-void decrement_active_buffers(void *arg)
-{
-       struct ustconsumer_instance *instance = arg;
-       pthread_mutex_lock(&instance->mutex);
-       instance->active_buffers--;
-       pthread_mutex_unlock(&instance->mutex);
-}
-
-static int get_pidunique(int sock, int64_t *pidunique)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_pidunique _recv_msg, *recv_msg;
-       int result;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       recv_msg = &_recv_msg;
-
-       memset(send_hdr, 0, sizeof(*send_hdr));
-
-       send_hdr->command = GET_PIDUNIQUE;
-       result = ustcomm_req(sock, send_hdr, NULL, recv_hdr, (char *)recv_msg);
-       if (result < 1) {
-               return -ENOTCONN;
-       }
-       if (recv_hdr->result < 0) {
-               ERR("App responded with error: %s", strerror(recv_hdr->result));
-               return recv_hdr->result;
-       }
-
-       *pidunique = recv_msg->pidunique;
-
-       return 0;
-}
-
-static int get_buf_shmid_pipe_fd(int sock, struct buffer_info *buf,
-                                int *buf_shmid, int *buf_struct_shmid,
-                                int *buf_pipe_fd)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_buffer_info _send_msg, *send_msg;
-       struct ustcomm_buffer_info _recv_msg, *recv_msg;
-       int result, recv_pipe_fd;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       send_msg = &_send_msg;
-       recv_msg = &_recv_msg;
-
-       result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
-                                         buf->channel, buf->channel_cpu);
-       if (result < 0) {
-               ERR("Failed to pack buffer info");
-               return result;
-       }
-
-       send_hdr->command = GET_BUF_SHMID_PIPE_FD;
-
-       result = ustcomm_send(sock, send_hdr, (char *)send_msg);
-       if (result < 1) {
-               ERR("Failed to send request");
-               return -ENOTCONN;
-       }
-       result = ustcomm_recv_fd(sock, recv_hdr, (char *)recv_msg, &recv_pipe_fd);
-       if (result < 1) {
-               ERR("Failed to receive message and fd");
-               return -ENOTCONN;
-       }
-       if (recv_hdr->result < 0) {
-               ERR("App responded with error %s", strerror(recv_hdr->result));
-               return recv_hdr->result;
-       }
-
-       *buf_shmid = recv_msg->buf_shmid;
-       *buf_struct_shmid = recv_msg->buf_struct_shmid;
-       *buf_pipe_fd = recv_pipe_fd;
-
-       return 0;
-}
-
-static int get_subbuf_num_size(int sock, struct buffer_info *buf,
-                              int *subbuf_num, int *subbuf_size)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_channel_info _send_msg, *send_msg;
-       struct ustcomm_channel_info _recv_msg, *recv_msg;
-       int result;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       send_msg = &_send_msg;
-       recv_msg = &_recv_msg;
-
-       result = ustcomm_pack_channel_info(send_hdr, send_msg, buf->trace,
-                                          buf->channel);
-       if (result < 0) {
-               return result;
-       }
-
-       send_hdr->command = GET_SUBBUF_NUM_SIZE;
-
-       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
-                            recv_hdr, (char *)recv_msg);
-       if (result < 1) {
-               return -ENOTCONN;
-       }
-
-       *subbuf_num = recv_msg->subbuf_num;
-       *subbuf_size = recv_msg->subbuf_size;
-
-       return recv_hdr->result;
-}
-
-
-static int notify_buffer_mapped(int sock, struct buffer_info *buf)
-{
-       struct ustcomm_header _send_hdr, *send_hdr;
-       struct ustcomm_header _recv_hdr, *recv_hdr;
-       struct ustcomm_buffer_info _send_msg, *send_msg;
-       int result;
-
-       send_hdr = &_send_hdr;
-       recv_hdr = &_recv_hdr;
-       send_msg = &_send_msg;
-
-       result = ustcomm_pack_buffer_info(send_hdr, send_msg, buf->trace,
-                                         buf->channel, buf->channel_cpu);
-       if (result < 0) {
-               return result;
-       }
-
-       send_hdr->command = NOTIFY_BUF_MAPPED;
-
-       result = ustcomm_req(sock, send_hdr, (char *)send_msg,
-                            recv_hdr, NULL);
-       if (result < 1) {
-               return -ENOTCONN;
-       }
-
-       return recv_hdr->result;
-}
-
-
-struct buffer_info *connect_buffer(struct ustconsumer_instance *instance, pid_t pid,
-                                  const char *trace, const char *channel,
-                                  int channel_cpu)
-{
-       struct buffer_info *buf;
-       int result;
-       struct shmid_ds shmds;
-
-       buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
-       if(buf == NULL) {
-               ERR("add_buffer: insufficient memory");
-               return NULL;
-       }
-
-       buf->trace = strdup(trace);
-       if (!buf->trace) {
-               goto free_buf;
-       }
-
-       buf->channel = strdup(channel);
-       if (!buf->channel) {
-               goto free_buf_trace;
-       }
-
-       result = asprintf(&buf->name, "%s_%d", channel, channel_cpu);
-       if (result < 0 || buf->name == NULL) {
-               goto free_buf_channel;
-       }
-
-       buf->channel_cpu = channel_cpu;
-       buf->pid = pid;
-
-       result = ustcomm_connect_app(buf->pid, &buf->app_sock);
-       if(result) {
-               WARN("unable to connect to process, it probably died before we were able to connect");
-               goto free_buf_name;
-       }
-
-       /* get pidunique */
-       result = get_pidunique(buf->app_sock, &buf->pidunique);
-       if (result < 0) {
-               ERR("Failed to get pidunique");
-               goto close_app_sock;
-       }
-
-       /* get shmid and pipe fd */
-       result = get_buf_shmid_pipe_fd(buf->app_sock, buf, &buf->shmid,
-                                      &buf->bufstruct_shmid, &buf->pipe_fd);
-       if (result < 0) {
-               ERR("Failed to get buf_shmid and pipe_fd");
-               goto close_app_sock;
-       } else {
-               struct stat temp;
-               fstat(buf->pipe_fd, &temp);
-               if (!S_ISFIFO(temp.st_mode)) {
-                       ERR("Didn't receive a fifo from the app");
-                       goto close_app_sock;
-               }
-       }
-
-
-       /* get number of subbufs and subbuf size */
-       result = get_subbuf_num_size(buf->app_sock, buf, &buf->n_subbufs,
-                                    &buf->subbuf_size);
-       if (result < 0) {
-               ERR("Failed to get subbuf number and size");
-               goto close_fifo;
-       }
-
-       /* Set subbuffer's information */
-       buf->subbuf_size_order = get_count_order(buf->subbuf_size);
-       buf->alloc_size = buf->subbuf_size * buf->n_subbufs;
-
-       /* attach memory */
-       buf->mem = shmat(buf->shmid, NULL, 0);
-       if(buf->mem == (void *) 0) {
-               PERROR("shmat");
-               goto close_fifo;
-       }
-       DBG("successfully attached buffer memory");
-
-       buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
-       if(buf->bufstruct_mem == (void *) 0) {
-               PERROR("shmat");
-               goto shmdt_mem;
-       }
-       DBG("successfully attached buffer bufstruct memory");
-
-       /* obtain info on the memory segment */
-       result = shmctl(buf->shmid, IPC_STAT, &shmds);
-       if(result == -1) {
-               PERROR("shmctl");
-               goto shmdt_bufstruct_mem;
-       }
-       buf->memlen = shmds.shm_segsz;
-
-       /* Notify the application that we have mapped the buffer */
-       result = notify_buffer_mapped(buf->app_sock, buf);
-       if (result < 0) {
-               goto shmdt_bufstruct_mem;
-       }
-
-       if(instance->callbacks->on_open_buffer)
-               instance->callbacks->on_open_buffer(instance->callbacks, buf);
-
-       pthread_mutex_lock(&instance->mutex);
-       instance->active_buffers++;
-       pthread_mutex_unlock(&instance->mutex);
-
-       return buf;
-
-shmdt_bufstruct_mem:
-       shmdt(buf->bufstruct_mem);
-
-shmdt_mem:
-       shmdt(buf->mem);
-
-close_fifo:
-       close(buf->pipe_fd);
-
-close_app_sock:
-       close(buf->app_sock);
-
-free_buf_name:
-       free(buf->name);
-
-free_buf_channel:
-       free(buf->channel);
-
-free_buf_trace:
-       free(buf->trace);
-
-free_buf:
-       free(buf);
-       return NULL;
-}
-
-static void destroy_buffer(struct ustconsumer_callbacks *callbacks,
-                       struct buffer_info *buf)
-{
-       int result;
-
-       result = close(buf->pipe_fd);
-       if(result == -1) {
-               WARN("problem closing the pipe fd");
-       }
-
-       result = close(buf->app_sock);
-       if(result == -1) {
-               WARN("problem calling ustcomm_close_app");
-       }
-
-       result = shmdt(buf->mem);
-       if(result == -1) {
-               PERROR("shmdt");
-       }
-
-       result = shmdt(buf->bufstruct_mem);
-       if(result == -1) {
-               PERROR("shmdt");
-       }
-
-       if(callbacks->on_close_buffer)
-               callbacks->on_close_buffer(callbacks, buf);
-
-       free(buf);
-}
-
-int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf)
-{
-       int result = 0;
-       int read_result;
-       char read_buf;
-
-       pthread_cleanup_push(decrement_active_buffers, instance);
-
-       for(;;) {
-               read_result = read(buf->pipe_fd, &read_buf, 1);
-               /* get the subbuffer */
-               if (read_result == 1) {
-                       result = get_subbuffer(buf);
-                       if (result < 0) {
-                               ERR("error getting subbuffer");
-                               continue;
-                       } else if (result == GET_SUBBUF_DIED) {
-                               finish_consuming_dead_subbuffer(instance->callbacks, buf);
-                               break;
-                       }
-               } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
-                          result == 0) {
-                       DBG("App died while being traced");
-                       finish_consuming_dead_subbuffer(instance->callbacks, buf);
-                       break;
-               } else if (read_result == -1 && errno == EINTR) {
-                       continue;
-               }
-
-               if(instance->callbacks->on_read_subbuffer)
-                       instance->callbacks->on_read_subbuffer(instance->callbacks, buf);
-
-               /* put the subbuffer */
-               result = put_subbuffer(buf);
-               if(result == -1) {
-                       ERR("unknown error putting subbuffer (channel=%s)", buf->name);
-                       break;
-               }
-               else if(result == PUT_SUBBUF_PUSHED) {
-                       ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
-                       break;
-               }
-               else if(result == PUT_SUBBUF_DIED) {
-                       DBG("application died while putting subbuffer");
-                       /* Skip the first subbuffer. We are not sure it is trustable
-                        * because the put_subbuffer() did not complete.
-                        */
-                       /* TODO: check on_put_error return value */
-                       if(instance->callbacks->on_put_error)
-                               instance->callbacks->on_put_error(instance->callbacks, buf);
-
-                       finish_consuming_dead_subbuffer(instance->callbacks, buf);
-                       break;
-               }
-               else if(result == PUT_SUBBUF_DONE) {
-                       /* Done with this subbuffer */
-                       /* FIXME: add a case where this branch is used? Upon
-                        * normal trace termination, at put_subbuf time, a
-                        * special last-subbuffer code could be returned by
-                        * the listener.
-                        */
-                       break;
-               }
-               else if(result == PUT_SUBBUF_OK) {
-               }
-       }
-
-       DBG("thread for buffer %s is stopping", buf->name);
-
-       /* FIXME: destroy, unalloc... */
-
-       pthread_cleanup_pop(1);
-
-       return 0;
-}
-
-struct consumer_thread_args {
-       pid_t pid;
-       const char *trace;
-       const char *channel;
-       int channel_cpu;
-       struct ustconsumer_instance *instance;
-};
-
-void *consumer_thread(void *arg)
-{
-       struct buffer_info *buf;
-       struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
-       int result;
-       sigset_t sigset;
-
-       pthread_mutex_lock(&args->instance->mutex);
-       args->instance->active_threads++;
-       pthread_mutex_unlock(&args->instance->mutex);
-
-       if(args->instance->callbacks->on_new_thread)
-               args->instance->callbacks->on_new_thread(args->instance->callbacks);
-
-       /* Block signals that should be handled by the main thread. */
-       result = sigemptyset(&sigset);
-       if(result == -1) {
-               PERROR("sigemptyset");
-               goto end;
-       }
-       result = sigaddset(&sigset, SIGTERM);
-       if(result == -1) {
-               PERROR("sigaddset");
-               goto end;
-       }
-       result = sigaddset(&sigset, SIGINT);
-       if(result == -1) {
-               PERROR("sigaddset");
-               goto end;
-       }
-       result = sigprocmask(SIG_BLOCK, &sigset, NULL);
-       if(result == -1) {
-               PERROR("sigprocmask");
-               goto end;
-       }
-
-       buf = connect_buffer(args->instance, args->pid, args->trace,
-                            args->channel, args->channel_cpu);
-       if(buf == NULL) {
-               ERR("failed to connect to buffer");
-               goto end;
-       }
-
-       consumer_loop(args->instance, buf);
-
-       destroy_buffer(args->instance->callbacks, buf);
-
-       end:
-
-       if(args->instance->callbacks->on_close_thread)
-               args->instance->callbacks->on_close_thread(args->instance->callbacks);
-
-       pthread_mutex_lock(&args->instance->mutex);
-       args->instance->active_threads--;
-       pthread_mutex_unlock(&args->instance->mutex);
-
-       free((void *)args->channel);
-       free(args);
-       return NULL;
-}
-
-int start_consuming_buffer(struct ustconsumer_instance *instance, pid_t pid,
-                          const char *trace, const char *channel,
-                          int channel_cpu)
-{
-       pthread_t thr;
-       struct consumer_thread_args *args;
-       int result;
-
-       DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid, channel,
-           channel_cpu);
-
-       args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args));
-       if (!args) {
-               return -ENOMEM;
-       }
-
-       args->pid = pid;
-       args->trace = strdup(trace);
-       args->channel = strdup(channel);
-       args->channel_cpu = channel_cpu;
-       args->instance = instance;
-       DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
-           " bufname %s_%d", args->pid, args->trace, args->channel, args->channel_cpu);
-
-       result = pthread_create(&thr, NULL, consumer_thread, args);
-       if(result == -1) {
-               ERR("pthread_create failed");
-               return -1;
-       }
-       result = pthread_detach(thr);
-       if(result == -1) {
-               ERR("pthread_detach failed");
-               return -1;
-       }
-       DBG("end of start_consuming_buffer: args: pid %d trace %s "
-           "bufname %s_%d", args->pid, args->channel, args->trace, args->channel_cpu);
-
-       return 0;
-}
-static void process_client_cmd(int sock, struct ustcomm_header *req_header,
-                              char *recvbuf, struct ustconsumer_instance *instance)
-{
-       int result;
-       struct ustcomm_header _res_header = {0};
-       struct ustcomm_header *res_header = &_res_header;
-       struct ustcomm_buffer_info *buf_inf;
-
-       DBG("Processing client command");
-
-       switch (req_header->command) {
-       case CONSUME_BUFFER:
-
-               buf_inf = (struct ustcomm_buffer_info *)recvbuf;
-               result = ustcomm_unpack_buffer_info(buf_inf);
-               if (result < 0) {
-                       ERR("Couldn't unpack buffer info");
-                       return;
-               }
-
-               DBG("Going to consume trace %s buffer %s_%d in process %d",
-                   buf_inf->trace, buf_inf->channel, buf_inf->ch_cpu,
-                   buf_inf->pid);
-               result = start_consuming_buffer(instance, buf_inf->pid,
-                                               buf_inf->trace,
-                                               buf_inf->channel,
-                                               buf_inf->ch_cpu);
-               if (result < 0) {
-                       ERR("error in add_buffer");
-                       return;
-               }
-
-               res_header->result = 0;
-               break;
-       case EXIT:
-               res_header->result = 0;
-               /* Only there to force poll to return */
-               break;
-       default:
-               res_header->result = -EINVAL;
-               WARN("unknown command: %d", req_header->command);
-       }
-
-       if (ustcomm_send(sock, res_header, NULL) <= 0) {
-               ERR("couldn't send command response");
-       }
-}
-
-#define MAX_EVENTS 10
-
-int ustconsumer_start_instance(struct ustconsumer_instance *instance)
-{
-       struct ustcomm_header recv_hdr;
-       char recv_buf[USTCOMM_BUFFER_SIZE];
-       struct ustcomm_sock *epoll_sock;
-       struct epoll_event events[MAX_EVENTS];
-       struct sockaddr addr;
-       int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
-
-       if(!instance->is_init) {
-               ERR("libustconsumer instance not initialized");
-               return 1;
-       }
-       epoll_fd = instance->epoll_fd;
-
-       timeout = -1;
-
-       /* app loop */
-       for(;;) {
-               nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
-               if (nfds == -1 && errno == EINTR) {
-                       /* Caught signal */
-               } else if (nfds == -1) {
-                       PERROR("ustconsumer_start_instance: epoll_wait failed");
-                       continue;
-               }
-
-               for (i = 0; i < nfds; ++i) {
-                       epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
-                       if (epoll_sock == instance->listen_sock) {
-                               addr_size = sizeof(struct sockaddr);
-                               accept_fd = accept(epoll_sock->fd,
-                                                  &addr,
-                                                  (socklen_t *)&addr_size);
-                               if (accept_fd == -1) {
-                                       PERROR("ustconsumer_start_instance: "
-                                              "accept failed");
-                                       continue;
-                               }
-                               ustcomm_init_sock(accept_fd, epoll_fd,
-                                                &instance->connections);
-                       } else {
-                               result = ustcomm_recv(epoll_sock->fd, &recv_hdr,
-                                                     recv_buf);
-                               if (result < 1) {
-                                       ustcomm_del_sock(epoll_sock, 0);
-                               } else {
-                                       process_client_cmd(epoll_sock->fd,
-                                                          &recv_hdr, recv_buf,
-                                                          instance);
-                               }
-
-                       }
-               }
-
-               if (instance->quit_program) {
-                       pthread_mutex_lock(&instance->mutex);
-                       if (instance->active_buffers == 0 && instance->active_threads == 0) {
-                               pthread_mutex_unlock(&instance->mutex);
-                               break;
-                       }
-                       pthread_mutex_unlock(&instance->mutex);
-                       timeout = 100;
-               }
-       }
-
-       if(instance->callbacks->on_trace_end)
-               instance->callbacks->on_trace_end(instance);
-
-       ustconsumer_delete_instance(instance);
-
-       return 0;
-}
-
-/* FIXME: threads and connections !? */
-void ustconsumer_delete_instance(struct ustconsumer_instance *instance)
-{
-       if (instance->is_init) {
-               ustcomm_del_named_sock(instance->listen_sock, 0);
-               close(instance->epoll_fd);
-       }
-
-       pthread_mutex_destroy(&instance->mutex);
-       free(instance->sock_path);
-       free(instance);
-}
-
-/* FIXME: Do something about the fixed path length, maybe get rid
- * of the whole concept and use a pipe?
- */
-int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_msg)
-{
-       int result;
-       int fd;
-       int bytes = 0;
-
-       char msg[] = "exit";
-
-       instance->quit_program = 1;
-
-       if(!send_msg)
-               return 0;
-
-       /* Send a message through the socket to force poll to return */
-
-       struct sockaddr_un addr;
-
-socket_again:
-       result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
-       if(result == -1) {
-               if (errno == EINTR)
-                       goto socket_again;
-               PERROR("socket");
-               return 1;
-       }
-
-       addr.sun_family = AF_UNIX;
-
-       strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX);
-       addr.sun_path[UNIX_PATH_MAX-1] = '\0';
-
-connect_again:
-       result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
-       if(result == -1) {
-               if (errno == EINTR)
-                       goto connect_again;
-               PERROR("connect");
-       }
-
-       while(bytes != sizeof(msg)) {
-               int inc = send(fd, msg, sizeof(msg), 0);
-               if (inc < 0 && errno != EINTR)
-                       break;
-               else
-                       bytes += inc;
-       }
-
-       close(fd);
-
-       return 0;
-}
-
-struct ustconsumer_instance
-*ustconsumer_new_instance(struct ustconsumer_callbacks *callbacks,
-                     char *sock_path)
-{
-       struct ustconsumer_instance *instance =
-               zmalloc(sizeof(struct ustconsumer_instance));
-       if(!instance) {
-               return NULL;
-       }
-
-       instance->callbacks = callbacks;
-       instance->quit_program = 0;
-       instance->is_init = 0;
-       instance->active_buffers = 0;
-       pthread_mutex_init(&instance->mutex, NULL);
-
-       if (sock_path) {
-               instance->sock_path = strdup(sock_path);
-       } else {
-               instance->sock_path = NULL;
-       }
-
-       return instance;
-}
-
-static int init_ustconsumer_socket(struct ustconsumer_instance *instance)
-{
-       char *name;
-
-       if (instance->sock_path) {
-               if (asprintf(&name, "%s", instance->sock_path) < 0) {
-                       ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
-                           instance->sock_path);
-                       return -1;
-               }
-       } else {
-               int result;
-
-               /* Only check if socket dir exists if we are using the default directory */
-               result = ensure_dir_exists(SOCK_DIR, S_IRWXU | S_IRWXG | S_IRWXO);
-               if (result == -1) {
-                       ERR("Unable to create socket directory %s", SOCK_DIR);
-                       return -1;
-               }
-
-               if (asprintf(&name, "%s/%s", SOCK_DIR, "ustconsumer") < 0) {
-                       ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
-                           SOCK_DIR);
-                       return -1;
-               }
-       }
-
-       /* Set up epoll */
-       instance->epoll_fd = epoll_create(MAX_EVENTS);
-       if (instance->epoll_fd == -1) {
-               ERR("epoll_create failed, start instance bailing");
-               goto free_name;
-       }
-
-       /* Create the named socket */
-       instance->listen_sock = ustcomm_init_named_socket(name,
-                                                         instance->epoll_fd);
-       if(!instance->listen_sock) {
-               ERR("error initializing named socket at %s", name);
-               goto close_epoll;
-       }
-
-       CDS_INIT_LIST_HEAD(&instance->connections);
-
-       free(name);
-
-       return 0;
-
-close_epoll:
-       close(instance->epoll_fd);
-free_name:
-       free(name);
-
-       return -1;
-}
-
-int ustconsumer_init_instance(struct ustconsumer_instance *instance)
-{
-       int result;
-       result = init_ustconsumer_socket(instance);
-       if(result == -1) {
-               ERR("failed to initialize socket");
-               return 1;
-       }
-       instance->is_init = 1;
-       return 0;
-}
-
This page took 0.031485 seconds and 4 git commands to generate.