From d159ac37826eaebf4c1d87b8db0c5e5a8ccc0b00 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Alexis=20Hall=C3=A9?= Date: Wed, 14 Jul 2010 14:51:56 -0400 Subject: [PATCH] create libustd from ustd This makes libustd a library containing the core functionality of the daemon, and ustd a simple client application using this library, interacting with it via a set of callbacks supplied to it. --- Makefile.am | 2 +- configure.ac | 1 + libustd/Makefile.am | 17 + libustd/libustd.c | 672 ++++++++++++++++++++++++++++++++++ libustd/libustd.h | 279 +++++++++++++++ {ustd => libustd}/lowlevel.c | 25 +- ustd/Makefile.am | 8 +- ustd/ustd.c | 676 +++++++---------------------------- ustd/ustd.h | 58 --- 9 files changed, 1111 insertions(+), 627 deletions(-) create mode 100644 libustd/Makefile.am create mode 100644 libustd/libustd.c create mode 100644 libustd/libustd.h rename {ustd => libustd}/lowlevel.c (89%) delete mode 100644 ustd/ustd.h diff --git a/Makefile.am b/Makefile.am index 5a7c03b..eb0c1b5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5,7 +5,7 @@ ACLOCAL_AMFLAGS = -I config # libust and '.' (that contains the linker script). However, '.' # must be installed after libust so it can overwrite libust.so with # the linker script. -SUBDIRS = snprintf libustcomm libust . tests libustinstr-malloc ustd ustctl libustfork include doc +SUBDIRS = snprintf libustcomm libust . tests libustinstr-malloc libustd ustd ustctl libustfork include doc EXTRA_DIST = libust.ldscript.in libust-initializer.c dist_bin_SCRIPTS = usttrace diff --git a/configure.ac b/configure.ac index 0b67b38..777af5a 100644 --- a/configure.ac +++ b/configure.ac @@ -123,6 +123,7 @@ AC_CONFIG_FILES([ tests/same_line_marker/Makefile libustinstr-malloc/Makefile libustfork/Makefile + libustd/Makefile ustd/Makefile ustctl/Makefile libustcomm/Makefile diff --git a/libustd/Makefile.am b/libustd/Makefile.am new file mode 100644 index 0000000..1d6d328 --- /dev/null +++ b/libustd/Makefile.am @@ -0,0 +1,17 @@ +AM_CPPFLAGS = -I$(top_srcdir)/libust -I$(top_srcdir)/libustcomm \ + -I$(top_srcdir)/include +AM_CFLAGS = -fno-strict-aliasing + +lib_LTLIBRARIES = libustd.la + +libustd_la_SOURCES = libustd.c lowlevel.c libustd.h + +libustd_la_LDFLAGS = -no-undefined -version-info 0:0:0 + +libustd_la_LIBADD = \ + -lpthread \ + $(top_builddir)/snprintf/libustsnprintf.la \ + $(top_builddir)/libustcomm/libustcomm.la + +libustd_la_CFLAGS = -fno-strict-aliasing + diff --git a/libustd/libustd.c b/libustd/libustd.c new file mode 100644 index 0000000..e0d48b5 --- /dev/null +++ b/libustd/libustd.c @@ -0,0 +1,672 @@ +/* Copyright (C) 2009 Pierre-Marc Fournier + * 2010 Alexis Halle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "libustd.h" +#include "usterr.h" +#include "ustcomm.h" + +/* return value: 0 = subbuffer is finished, it won't produce data anymore + * 1 = got subbuffer successfully + * <0 = error + */ + +#define GET_SUBBUF_OK 1 +#define GET_SUBBUF_DONE 0 +#define GET_SUBBUF_DIED 2 + +#define PUT_SUBBUF_OK 1 +#define PUT_SUBBUF_DIED 0 +#define PUT_SUBBUF_PUSHED 2 +#define PUT_SUBBUF_DONE 3 + +#define UNIX_PATH_MAX 108 + +int get_subbuffer(struct buffer_info *buf) +{ + char *send_msg=NULL; + char *received_msg=NULL; + char *rep_code=NULL; + int retval; + int result; + + asprintf(&send_msg, "get_subbuffer %s", buf->name); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) { + DBG("app died while being traced"); + retval = GET_SUBBUF_DIED; + goto end; + } + else if(result < 0) { + ERR("get_subbuffer: ustcomm_send_request failed"); + retval = -1; + goto end; + } + + result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); + if(result != 2 && result != 1) { + ERR("unable to parse response to get_subbuffer"); + retval = -1; + free(received_msg); + goto end_rep; + } + + if(!strcmp(rep_code, "OK")) { + DBG("got subbuffer %s", buf->name); + retval = GET_SUBBUF_OK; + } + else if(nth_token_is(received_msg, "END", 0) == 1) { + retval = GET_SUBBUF_DONE; + goto end_rep; + } + else if(!strcmp(received_msg, "NOTFOUND")) { + DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); + retval = GET_SUBBUF_DIED; + goto end_rep; + } + else { + DBG("error getting subbuffer %s", buf->name); + retval = -1; + } + + /* FIXME: free correctly the stuff */ +end_rep: + if(rep_code) + free(rep_code); +end: + if(send_msg) + free(send_msg); + if(received_msg) + free(received_msg); + + return retval; +} + +int put_subbuffer(struct buffer_info *buf) +{ + char *send_msg=NULL; + char *received_msg=NULL; + char *rep_code=NULL; + int retval; + int result; + + asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) { + retval = PUT_SUBBUF_DIED; + goto end; + } + else if(result < 0) { + ERR("put_subbuffer: send_message failed"); + retval = -1; + goto end; + } + else if(result == 0) { + /* Program seems finished. However this might not be + * the last subbuffer that has to be collected. + */ + retval = PUT_SUBBUF_DIED; + goto end; + } + + result = sscanf(received_msg, "%as", &rep_code); + if(result != 1) { + ERR("unable to parse response to put_subbuffer"); + retval = -1; + goto end_rep; + } + + if(!strcmp(rep_code, "OK")) { + DBG("subbuffer put %s", buf->name); + retval = PUT_SUBBUF_OK; + } + else if(!strcmp(received_msg, "NOTFOUND")) { + DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); + /* However, maybe this was not the last subbuffer. So + * we return the program died. + */ + retval = PUT_SUBBUF_DIED; + goto end_rep; + } + else { + DBG("put_subbuffer: received error, we were pushed"); + retval = PUT_SUBBUF_PUSHED; + goto end_rep; + } + +end_rep: + if(rep_code) + free(rep_code); + +end: + if(send_msg) + free(send_msg); + if(received_msg) + free(received_msg); + + return retval; +} + +void decrement_active_buffers(void *arg) +{ + struct libustd_instance *instance = arg; + pthread_mutex_lock(&instance->mutex); + instance->active_buffers--; + pthread_mutex_unlock(&instance->mutex); +} + +struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname) +{ + struct buffer_info *buf; + char *send_msg; + char *received_msg; + int result; + char *tmp; + int fd; + struct shmid_ds shmds; + + buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); + if(buf == NULL) { + ERR("add_buffer: insufficient memory"); + return NULL; + } + + buf->name = bufname; + buf->pid = pid; + + /* connect to app */ + result = ustcomm_connect_app(buf->pid, &buf->conn); + if(result) { + WARN("unable to connect to process, it probably died before we were able to connect"); + return NULL; + } + + /* get pidunique */ + asprintf(&send_msg, "get_pidunique"); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); + if(result == -1) { + ERR("problem in ustcomm_send_request(get_pidunique)"); + return NULL; + } + if(result == 0) { + goto error; + } + + result = sscanf(received_msg, "%lld", &buf->pidunique); + if(result != 1) { + ERR("unable to parse response to get_pidunique"); + return NULL; + } + free(received_msg); + DBG("got pidunique %lld", buf->pidunique); + + /* get shmid */ + asprintf(&send_msg, "get_shmid %s", buf->name); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); + if(result == -1) { + ERR("problem in ustcomm_send_request(get_shmid)"); + return NULL; + } + if(result == 0) { + goto error; + } + + result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); + if(result != 2) { + ERR("unable to parse response to get_shmid (\"%s\")", received_msg); + return NULL; + } + free(received_msg); + DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); + + /* get n_subbufs */ + asprintf(&send_msg, "get_n_subbufs %s", buf->name); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); + if(result == -1) { + ERR("problem in ustcomm_send_request(g_n_subbufs)"); + return NULL; + } + if(result == 0) { + goto error; + } + + result = sscanf(received_msg, "%d", &buf->n_subbufs); + if(result != 1) { + ERR("unable to parse response to get_n_subbufs"); + return NULL; + } + free(received_msg); + DBG("got n_subbufs %d", buf->n_subbufs); + + /* get subbuf size */ + asprintf(&send_msg, "get_subbuf_size %s", buf->name); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + free(send_msg); + if(result == -1) { + ERR("problem in ustcomm_send_request(get_subbuf_size)"); + return NULL; + } + if(result == 0) { + goto error; + } + + result = sscanf(received_msg, "%d", &buf->subbuf_size); + if(result != 1) { + ERR("unable to parse response to get_subbuf_size"); + return NULL; + } + free(received_msg); + DBG("got subbuf_size %d", buf->subbuf_size); + + /* attach memory */ + buf->mem = shmat(buf->shmid, NULL, 0); + if(buf->mem == (void *) 0) { + PERROR("shmat"); + return NULL; + } + DBG("successfully attached buffer memory"); + + buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); + if(buf->bufstruct_mem == (void *) 0) { + PERROR("shmat"); + return NULL; + } + DBG("successfully attached buffer bufstruct memory"); + + /* obtain info on the memory segment */ + result = shmctl(buf->shmid, IPC_STAT, &shmds); + if(result == -1) { + PERROR("shmctl"); + return NULL; + } + buf->memlen = shmds.shm_segsz; + + if(instance->callbacks->on_open_buffer) + instance->callbacks->on_open_buffer(instance->callbacks, buf); + + pthread_mutex_lock(&instance->mutex); + instance->active_buffers++; + pthread_mutex_unlock(&instance->mutex); + + return buf; + +error: + free(buf); + return NULL; +} + +static void destroy_buffer(struct libustd_callbacks *callbacks, + struct buffer_info *buf) +{ + int result; + + result = ustcomm_close_app(&buf->conn); + if(result == -1) { + WARN("problem calling ustcomm_close_app"); + } + + result = shmdt(buf->mem); + if(result == -1) { + PERROR("shmdt"); + } + + result = shmdt(buf->bufstruct_mem); + if(result == -1) { + PERROR("shmdt"); + } + + if(callbacks->on_close_buffer) + callbacks->on_close_buffer(callbacks, buf); + + free(buf); +} + +int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) +{ + int result; + + pthread_cleanup_push(decrement_active_buffers, instance); + + for(;;) { + /* get the subbuffer */ + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + else if(result == GET_SUBBUF_DONE) { + /* this is done */ + break; + } + else if(result == GET_SUBBUF_DIED) { + finish_consuming_dead_subbuffer(instance->callbacks, buf); + break; + } + + if(instance->callbacks->on_read_subbuffer) + instance->callbacks->on_read_subbuffer(instance->callbacks, buf); + + /* put the subbuffer */ + result = put_subbuffer(buf); + if(result == -1) { + ERR("unknown error putting subbuffer (channel=%s)", buf->name); + break; + } + else if(result == PUT_SUBBUF_PUSHED) { + ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name); + break; + } + else if(result == PUT_SUBBUF_DIED) { + DBG("application died while putting subbuffer"); + /* Skip the first subbuffer. We are not sure it is trustable + * because the put_subbuffer() did not complete. + */ + if(instance->callbacks->on_put_error) + instance->callbacks->on_put_error(instance->callbacks, buf); + + finish_consuming_dead_subbuffer(instance->callbacks, buf); + break; + } + else if(result == PUT_SUBBUF_DONE) { + /* Done with this subbuffer */ + /* FIXME: add a case where this branch is used? Upon + * normal trace termination, at put_subbuf time, a + * special last-subbuffer code could be returned by + * the listener. + */ + break; + } + else if(result == PUT_SUBBUF_OK) { + } + } + + DBG("thread for buffer %s is stopping", buf->name); + + /* FIXME: destroy, unalloc... */ + + pthread_cleanup_pop(1); + + return 0; +} + +struct consumer_thread_args { + pid_t pid; + const char *bufname; + struct libustd_instance *instance; +}; + +void *consumer_thread(void *arg) +{ + struct buffer_info *buf; + struct consumer_thread_args *args = (struct consumer_thread_args *) arg; + int result; + sigset_t sigset; + + DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname); + + if(args->instance->callbacks->on_new_thread) + args->instance->callbacks->on_new_thread(args->instance->callbacks); + + /* Block signals that should be handled by the main thread. */ + result = sigemptyset(&sigset); + if(result == -1) { + PERROR("sigemptyset"); + goto end; + } + result = sigaddset(&sigset, SIGTERM); + if(result == -1) { + PERROR("sigaddset"); + goto end; + } + result = sigaddset(&sigset, SIGINT); + if(result == -1) { + PERROR("sigaddset"); + goto end; + } + result = sigprocmask(SIG_BLOCK, &sigset, NULL); + if(result == -1) { + PERROR("sigprocmask"); + goto end; + } + + buf = connect_buffer(args->instance, args->pid, args->bufname); + if(buf == NULL) { + ERR("failed to connect to buffer"); + goto end; + } + + consumer_loop(args->instance, buf); + + destroy_buffer(args->instance->callbacks, buf); + + end: + + if(args->instance->callbacks->on_close_thread) + args->instance->callbacks->on_close_thread(args->instance->callbacks); + + free((void *)args->bufname); + free(args); + return NULL; +} + +int start_consuming_buffer( + struct libustd_instance *instance, pid_t pid, const char *bufname) +{ + pthread_t thr; + struct consumer_thread_args *args; + int result; + + DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); + + args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args)); + + args->pid = pid; + args->bufname = strdup(bufname); + args->instance = instance; + DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); + + result = pthread_create(&thr, NULL, consumer_thread, args); + if(result == -1) { + ERR("pthread_create failed"); + return -1; + } + result = pthread_detach(thr); + if(result == -1) { + ERR("pthread_detach failed"); + return -1; + } + DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); + + return 0; +} + +int libustd_start_instance(struct libustd_instance *instance) +{ + int result; + int timeout = -1; + + if(!instance->is_init) { + ERR("libustd instance not initialized"); + return 1; + } + + /* app loop */ + for(;;) { + char *recvbuf; + + /* check for requests on our public socket */ + result = ustcomm_ustd_recv_message(&instance->comm, &recvbuf, NULL, timeout); + if(result == -1 && errno == EINTR) { + /* Caught signal */ + } + else if(result == -1) { + ERR("error in ustcomm_ustd_recv_message"); + goto loop_end; + } + else if(result > 0) { + if(!strncmp(recvbuf, "collect", 7)) { + pid_t pid; + char *bufname; + int result; + + result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); + if(result != 2) { + ERR("parsing error: %s", recvbuf); + goto free_bufname; + } + + result = start_consuming_buffer(instance, pid, bufname); + if(result < 0) { + ERR("error in add_buffer"); + goto free_bufname; + } + + free_bufname: + free(bufname); + } + else if(!strncmp(recvbuf, "exit", 4)) { + /* Only there to force poll to return */ + } + else { + WARN("unknown command: %s", recvbuf); + } + + free(recvbuf); + } + + loop_end: + + if(instance->quit_program) { + pthread_mutex_lock(&instance->mutex); + if(instance->active_buffers == 0) { + pthread_mutex_unlock(&instance->mutex); + break; + } + pthread_mutex_unlock(&instance->mutex); + timeout = 100; + } + } + + if(instance->callbacks->on_trace_end) + instance->callbacks->on_trace_end(instance); + + libustd_delete_instance(instance); + + return 0; +} + +void libustd_delete_instance(struct libustd_instance *instance) +{ + if(instance->is_init) + ustcomm_fini_ustd(&instance->comm); + + pthread_mutex_destroy(&instance->mutex); + free(instance->sock_path); + free(instance); +} + +int libustd_stop_instance(struct libustd_instance *instance, int send_msg) +{ + int result; + int fd; + int bytes = 0; + + char msg[] = "exit"; + + instance->quit_program = 1; + + if(!send_msg) + return 0; + + /* Send a message through the socket to force poll to return */ + + struct sockaddr_un addr; + + result = fd = socket(PF_UNIX, SOCK_STREAM, 0); + if(result == -1) { + PERROR("socket"); + return 1; + } + + addr.sun_family = AF_UNIX; + + strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX); + addr.sun_path[UNIX_PATH_MAX-1] = '\0'; + + result = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); + if(result == -1) { + PERROR("connect"); + } + + while(bytes != sizeof(msg)) + bytes += send(fd, msg, sizeof(msg), 0); + + close(fd); + + return 0; +} + +struct libustd_instance *libustd_new_instance( + struct libustd_callbacks *callbacks, char *sock_path) +{ + struct libustd_instance *instance = + malloc(sizeof(struct libustd_instance)); + if(!instance) + return NULL; + + instance->callbacks = callbacks; + instance->quit_program = 0; + instance->is_init = 0; + instance->active_buffers = 0; + pthread_mutex_init(&instance->mutex, NULL); + + if(sock_path) + instance->sock_path = strdup(sock_path); + else + instance->sock_path = NULL; + + return instance; +} + +int libustd_init_instance(struct libustd_instance *instance) +{ + int result; + result = ustcomm_init_ustd(&instance->comm, instance->sock_path); + if(result == -1) { + ERR("failed to initialize socket"); + return 1; + } + instance->is_init = 1; + return 0; +} + diff --git a/libustd/libustd.h b/libustd/libustd.h new file mode 100644 index 0000000..f08ce72 --- /dev/null +++ b/libustd/libustd.h @@ -0,0 +1,279 @@ +/* + * libustd header file + * + * Copyright 2005-2010 - + * Mathieu Desnoyers + * Copyright 2010- + * Oumarou Dicko + * Michael Sills-Lavoie + * Alexis Halle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LIBUSTD_H +#define LIBUSTD_H + +#include +#include +#include "ustcomm.h" + +#define USTD_DEFAULT_TRACE_PATH "/tmp/usttrace" + +struct buffer_info { + const char *name; + pid_t pid; + struct ustcomm_connection conn; + + int shmid; + int bufstruct_shmid; + + /* the buffer memory */ + void *mem; + /* buffer size */ + int memlen; + /* number of subbuffers in buffer */ + int n_subbufs; + /* size of each subbuffer */ + int subbuf_size; + + /* the buffer information struct */ + void *bufstruct_mem; + + long consumed_old; + + s64 pidunique; + + void *user_data; +}; + +struct libustd_callbacks; + +/** + * struct libustd_instance - Contains the data associated with a trace instance. + * The lib user can read but MUST NOT change any attributes but callbacks. + * @callbacks: Contains the necessary callbacks for a tracing session. + */ +struct libustd_instance { + struct libustd_callbacks *callbacks; + int quit_program; + int is_init; + struct ustcomm_ustd comm; + char *sock_path; + pthread_mutex_t mutex; + int active_buffers; +}; + +/** +* struct libustd_callbacks - Contains the necessary callbacks for a tracing +* session. The user can set the unnecessary functions to NULL if he does not +* need them. +*/ +struct libustd_callbacks { + /** + * on_open_buffer - Is called after a buffer is attached to process memory + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * @buf: structure that contains the data associated with the buffer + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_open_buffer)(struct libustd_callbacks *data, + struct buffer_info *buf); + + /** + * on_close_buffer - Is called after a buffer is detached from process memory + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * @buf: structure that contains the data associated with the buffer + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_close_buffer)(struct libustd_callbacks *data, + struct buffer_info *buf); + + /** + * on_read_subbuffer - Is called after a subbuffer is a reserved. + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * @buf: structure that contains the data associated with the buffer + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_read_subbuffer)(struct libustd_callbacks *data, + struct buffer_info *buf); + + /** + * on_read_partial_subbuffer - Is called when an incomplete subbuffer + * is being salvaged from an app crash + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * @buf: structure that contains the data associated with the buffer + * @subbuf_index: index of the subbuffer to read in the buffer + * @valid_length: number of bytes considered safe to read + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_read_partial_subbuffer)(struct libustd_callbacks *data, + struct buffer_info *buf, + long subbuf_index, + unsigned long valid_length); + + /** + * on_put_error - Is called when a put error has occured and the last + * subbuffer read is no longer safe to keep + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * @buf: structure that contains the data associated with the buffer + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_put_error)(struct libustd_callbacks *data, + struct buffer_info *buf); + + /** + * on_new_thread - Is called when a new thread is created + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_new_thread)(struct libustd_callbacks *data); + + /** + * on_close_thread - Is called just before a thread is destroyed + * + * @data: pointer to the callbacks structure that has been passed to the + * library. + * + * Returns 0 if the callback succeeds else not 0. + * + * It has to be thread safe, because it is called by many threads. + */ + int (*on_close_thread)(struct libustd_callbacks *data); + + /** + * on_trace_end - Is called at the very end of the tracing session. At + * this time, everything has been closed and the threads have + * been destroyed. + * + * @instance: pointer to the instance structure that has been passed to + * the library. + * + * Returns 0 if the callback succeeds else not 0. + * + * After this callback is called, no other callback will be called + * again and the tracing instance will be deleted automatically by + * libustd. After this call, the user must not use the libustd instance. + */ + int (*on_trace_end)(struct libustd_instance *instance); + + /** + * The library's data. + */ + void *user_data; +}; + +/** + * libustd_new_instance - Is called to create a new tracing session. + * + * @callbacks: Pointer to a callbacks structure that contain the user + * callbacks and data. + * @sock_path: Path to the socket used for communication with the traced app + * + * Returns the instance if the function succeeds else NULL. + */ +struct libustd_instance * +libustd_new_instance( + struct libustd_callbacks *callbacks, char *sock_path); + +/** + * libustd_delete_instance - Is called to free a libustd_instance struct + * + * @instance: The tracing session instance that needs to be freed. + * + * This function should only be called if the instance has not been started, + * as it will automatically be called at the end of libustd_start_instance. + */ +void libustd_delete_instance(struct libustd_instance *instance); + +/** + * libustd_init_instance - Is called to initiliaze a new tracing session + * + * @instance: The tracing session instance that needs to be started. + * + * Returns 0 if the function succeeds. + * + * This function must be called between libustd_new_instance and + * libustd_start_instance. It sets up the communication between the library + * and the tracing application. + */ +int libustd_init_instance(struct libustd_instance *instance); + +/** + * libustd_start_instance - Is called to start a new tracing session. + * + * @instance: The tracing session instance that needs to be started. + * + * Returns 0 if the function succeeds. + * + * This is a blocking function. The caller will be blocked on it until the + * tracing session is stopped by the user using libustd_stop_instance or until + * the traced application terminates + */ +int libustd_start_instance(struct libustd_instance *instance); + +/** + * libustd_stop_instance - Is called to stop a tracing session. + * + * @instance: The tracing session instance that needs to be stoped. + * @send_msg: If true, a message will be sent to the listening thread through + * the daemon socket to force it to return from the poll syscall + * and realize that it must close. This is not necessary if the + * instance is being stopped as part of an interrupt handler, as + * the interrupt itself will cause poll to return. + * + * Returns 0 if the function succeeds. + * + * This function returns immediately, it only tells libustd to stop the + * instance. The on_trace_end callback will be called when the tracing session + * will really be stopped. The instance is deleted automatically by libustd + * after on_trace_end is called. + */ +int libustd_stop_instance(struct libustd_instance *instance, int send_msg); + +void finish_consuming_dead_subbuffer(struct libustd_callbacks *callbacks, struct buffer_info *buf); +size_t subbuffer_data_size(void *subbuf); + +#endif /* LIBUSTD_H */ + diff --git a/ustd/lowlevel.c b/libustd/lowlevel.c similarity index 89% rename from ustd/lowlevel.c rename to libustd/lowlevel.c index a10f931..62c8e6b 100644 --- a/ustd/lowlevel.c +++ b/libustd/lowlevel.c @@ -21,7 +21,7 @@ #include "buffers.h" #include "tracer.h" -#include "ustd.h" +#include "libustd.h" #include "usterr.h" /* This truncates to an offset in the buffer. */ @@ -62,7 +62,7 @@ size_t subbuffer_data_size(void *subbuf) } -void finish_consuming_dead_subbuffer(struct buffer_info *buf) +void finish_consuming_dead_subbuffer(struct libustd_callbacks *callbacks, struct buffer_info *buf) { int result; @@ -138,25 +138,8 @@ void finish_consuming_dead_subbuffer(struct buffer_info *buf) assert(i_subbuf == (last_subbuf % buf->n_subbufs)); } - - result = patient_write(buf->file_fd, buf->mem + i_subbuf * buf->subbuf_size, valid_length); - if(result == -1) { - ERR("Error writing to buffer file"); - return; - } - - /* pad with empty bytes */ - pad_size = PAGE_ALIGN(valid_length)-valid_length; - if(pad_size) { - tmp = malloc(pad_size); - memset(tmp, 0, pad_size); - result = patient_write(buf->file_fd, tmp, pad_size); - if(result == -1) { - ERR("Error writing to buffer file"); - return; - } - free(tmp); - } + if(callbacks->on_read_partial_subbuffer) + callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length); if(i_subbuf == last_subbuf % buf->n_subbufs) break; diff --git a/ustd/Makefile.am b/ustd/Makefile.am index 09d0984..991c717 100644 --- a/ustd/Makefile.am +++ b/ustd/Makefile.am @@ -1,14 +1,14 @@ AM_CPPFLAGS = -I$(top_srcdir)/libust -I$(top_srcdir)/libustcomm \ - -I$(top_srcdir)/include + -I$(top_srcdir)/include -I$(top_srcdir)/libustd AM_CFLAGS = -fno-strict-aliasing bin_PROGRAMS = ustd -ustd_SOURCES = lowlevel.c ustd.c ustd.h +ustd_SOURCES = ustd.c ustd_LDADD = \ $(top_builddir)/snprintf/libustsnprintf.la \ - $(top_builddir)/libustcomm/libustcomm.la + $(top_builddir)/libustcomm/libustcomm.la \ + $(top_builddir)/libustd/libustd.la -ustd_LDFLAGS = -lpthread ustd_CFLAGS = -DUST_COMPONENT=ustd -fno-strict-aliasing diff --git a/ustd/ustd.c b/ustd/ustd.c index 8a1f4d6..cca9520 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -1,4 +1,5 @@ /* Copyright (C) 2009 Pierre-Marc Fournier + * 2010 Alexis Halle * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -22,7 +23,6 @@ #include #include #include -#include #include #include @@ -32,165 +32,39 @@ #include #include -#include "ustd.h" +#include "libustd.h" #include "usterr.h" -#include "ustcomm.h" - -/* return value: 0 = subbuffer is finished, it won't produce data anymore - * 1 = got subbuffer successfully - * <0 = error - */ - -#define GET_SUBBUF_OK 1 -#define GET_SUBBUF_DONE 0 -#define GET_SUBBUF_DIED 2 - -#define PUT_SUBBUF_OK 1 -#define PUT_SUBBUF_DIED 0 -#define PUT_SUBBUF_PUSHED 2 -#define PUT_SUBBUF_DONE 3 char *sock_path=NULL; char *trace_path=NULL; int daemon_mode = 0; char *pidfile = NULL; -/* Number of active buffers and the mutex to protect it. */ -int active_buffers = 0; -pthread_mutex_t active_buffers_mutex = PTHREAD_MUTEX_INITIALIZER; -/* Whether a request to end the program was received. */ -volatile sig_atomic_t terminate_req = 0; - -int get_subbuffer(struct buffer_info *buf) -{ - char *send_msg=NULL; - char *received_msg=NULL; - char *rep_code=NULL; - int retval; - int result; - - asprintf(&send_msg, "get_subbuffer %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) { - DBG("app died while being traced"); - retval = GET_SUBBUF_DIED; - goto end; - } - else if(result < 0) { - ERR("get_subbuffer: ustcomm_send_request failed"); - retval = -1; - goto end; - } - - result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); - if(result != 2 && result != 1) { - ERR("unable to parse response to get_subbuffer"); - retval = -1; - free(received_msg); - goto end_rep; - } - - if(!strcmp(rep_code, "OK")) { - DBG("got subbuffer %s", buf->name); - retval = GET_SUBBUF_OK; - } - else if(nth_token_is(received_msg, "END", 0) == 1) { - retval = GET_SUBBUF_DONE; - goto end_rep; - } - else if(!strcmp(received_msg, "NOTFOUND")) { - DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); - retval = GET_SUBBUF_DIED; - goto end_rep; - } - else { - DBG("error getting subbuffer %s", buf->name); - retval = -1; - } +struct libustd_instance *instance; - /* FIMXE: free correctly the stuff */ -end_rep: - if(rep_code) - free(rep_code); -end: - if(send_msg) - free(send_msg); - if(received_msg) - free(received_msg); - - return retval; -} +struct buffer_info_local { + /* output file */ + int file_fd; + /* the offset we must truncate to, to unput the last subbuffer */ + off_t previous_offset; +}; -int put_subbuffer(struct buffer_info *buf) +static int write_pidfile(const char *file_name, pid_t pid) { - char *send_msg=NULL; - char *received_msg=NULL; - char *rep_code=NULL; - int retval; - int result; - - asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) { - retval = PUT_SUBBUF_DIED; - goto end; - } - else if(result < 0) { - ERR("put_subbuffer: send_message failed"); - retval = -1; - goto end; - } - else if(result == 0) { - /* Program seems finished. However this might not be - * the last subbuffer that has to be collected. - */ - retval = PUT_SUBBUF_DIED; - goto end; - } - - result = sscanf(received_msg, "%as", &rep_code); - if(result != 1) { - ERR("unable to parse response to put_subbuffer"); - retval = -1; - goto end_rep; - } + FILE *pidfp; - if(!strcmp(rep_code, "OK")) { - DBG("subbuffer put %s", buf->name); - retval = PUT_SUBBUF_OK; - } - else if(!strcmp(received_msg, "NOTFOUND")) { - DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); - /* However, maybe this was not the last subbuffer. So - * we return the program died. - */ - retval = PUT_SUBBUF_DIED; - goto end_rep; - } - else { - DBG("put_subbuffer: received error, we were pushed"); - retval = PUT_SUBBUF_PUSHED; - goto end_rep; + pidfp = fopen(file_name, "w"); + if(!pidfp) { + PERROR("fopen (%s)", file_name); + WARN("killing child process"); + return -1; } -end_rep: - if(rep_code) - free(rep_code); - -end: - if(send_msg) - free(send_msg); - if(received_msg) - free(received_msg); + fprintf(pidfp, "%d\n", pid); - return retval; -} + fclose(pidfp); -void decrement_active_buffers(void *arg) -{ - pthread_mutex_lock(&active_buffers_mutex); - active_buffers--; - pthread_mutex_unlock(&active_buffers_mutex); + return 0; } int create_dir_if_needed(char *dir) @@ -207,152 +81,101 @@ int create_dir_if_needed(char *dir) return 0; } -int is_directory(const char *dir) +int unwrite_last_subbuffer(struct buffer_info *buf) { int result; - struct stat st; + struct buffer_info_local *buf_local = buf->user_data; - result = stat(dir, &st); + result = ftruncate(buf_local->file_fd, buf_local->previous_offset); if(result == -1) { - PERROR("stat"); - return 0; + PERROR("ftruncate"); + return -1; } - if(!S_ISDIR(st.st_mode)) { - return 0; + result = lseek(buf_local->file_fd, buf_local->previous_offset, SEEK_SET); + if(result == (int)(off_t)-1) { + PERROR("lseek"); + return -1; } - return 1; + return 0; } -struct buffer_info *connect_buffer(pid_t pid, const char *bufname) +int write_current_subbuffer(struct buffer_info *buf) { - struct buffer_info *buf; - char *send_msg; - char *received_msg; int result; - char *tmp; - int fd; - struct shmid_ds shmds; - - buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); - if(buf == NULL) { - ERR("add_buffer: insufficient memory"); - return NULL; - } + struct buffer_info_local *buf_local = buf->user_data; - buf->name = bufname; - buf->pid = pid; + void *subbuf_mem = buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)); - /* connect to app */ - result = ustcomm_connect_app(buf->pid, &buf->conn); - if(result) { - WARN("unable to connect to process, it probably died before we were able to connect"); - return NULL; - } + size_t cur_sb_size = subbuffer_data_size(subbuf_mem); - /* get pidunique */ - asprintf(&send_msg, "get_pidunique"); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(get_pidunique)"); - return NULL; - } - if(result == 0) { - goto error; + off_t cur_offset = lseek(buf_local->file_fd, 0, SEEK_CUR); + if(cur_offset == (off_t)-1) { + PERROR("lseek"); + return -1; } - result = sscanf(received_msg, "%lld", &buf->pidunique); - if(result != 1) { - ERR("unable to parse response to get_pidunique"); - return NULL; - } - free(received_msg); - DBG("got pidunique %lld", buf->pidunique); + buf_local->previous_offset = cur_offset; + DBG("previous_offset: %ld", cur_offset); - /* get shmid */ - asprintf(&send_msg, "get_shmid %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - free(send_msg); + result = patient_write(buf_local->file_fd, subbuf_mem, cur_sb_size); if(result == -1) { - ERR("problem in ustcomm_send_request(get_shmid)"); - return NULL; - } - if(result == 0) { - goto error; + PERROR("write"); + return -1; } - result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); - if(result != 2) { - ERR("unable to parse response to get_shmid (\"%s\")", received_msg); - return NULL; - } - free(received_msg); - DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); + return 0; +} - /* get n_subbufs */ - asprintf(&send_msg, "get_n_subbufs %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - free(send_msg); - if(result == -1) { - ERR("problem in ustcomm_send_request(g_n_subbufs)"); - return NULL; - } - if(result == 0) { - goto error; - } +int on_read_subbuffer(struct libustd_callbacks *data, struct buffer_info *buf) +{ + return write_current_subbuffer(buf); +} - result = sscanf(received_msg, "%d", &buf->n_subbufs); - if(result != 1) { - ERR("unable to parse response to get_n_subbufs"); - return NULL; - } - free(received_msg); - DBG("got n_subbufs %d", buf->n_subbufs); +int on_read_partial_subbuffer(struct libustd_callbacks *data, struct buffer_info *buf, + long subbuf_index, unsigned long valid_length) +{ + struct buffer_info_local *buf_local = buf->user_data; + char *tmp; + int result; + unsigned long pad_size; - /* get subbuf size */ - asprintf(&send_msg, "get_subbuf_size %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); - free(send_msg); + result = patient_write(buf_local->file_fd, buf->mem + subbuf_index * buf->subbuf_size, valid_length); if(result == -1) { - ERR("problem in ustcomm_send_request(get_subbuf_size)"); - return NULL; - } - if(result == 0) { - goto error; + ERR("Error writing to buffer file"); + return; } - result = sscanf(received_msg, "%d", &buf->subbuf_size); - if(result != 1) { - ERR("unable to parse response to get_subbuf_size"); - return NULL; + /* pad with empty bytes */ + pad_size = PAGE_ALIGN(valid_length)-valid_length; + if(pad_size) { + tmp = malloc(pad_size); + memset(tmp, 0, pad_size); + result = patient_write(buf_local->file_fd, tmp, pad_size); + if(result == -1) { + ERR("Error writing to buffer file"); + return; + } + free(tmp); } - free(received_msg); - DBG("got subbuf_size %d", buf->subbuf_size); - /* attach memory */ - buf->mem = shmat(buf->shmid, NULL, 0); - if(buf->mem == (void *) 0) { - PERROR("shmat"); - return NULL; - } - DBG("successfully attached buffer memory"); +} - buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0); - if(buf->bufstruct_mem == (void *) 0) { - PERROR("shmat"); - return NULL; - } - DBG("successfully attached buffer bufstruct memory"); +int on_open_buffer(struct libustd_callbacks *data, struct buffer_info *buf) +{ + char *tmp; + int result; + int fd; + struct buffer_info_local *buf_local = + malloc(sizeof(struct buffer_info_local)); - /* obtain info on the memory segment */ - result = shmctl(buf->shmid, IPC_STAT, &shmds); - if(result == -1) { - PERROR("shmctl"); - return NULL; + if(!buf_local) { + ERR("could not allocate buffer_info_local struct"); + return 1; } - buf->memlen = shmds.shm_segsz; + + buf->user_data = buf_local; /* open file for output */ if(!trace_path) { @@ -363,7 +186,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH); if(result == -1) { ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH); - return NULL; + return 1; } trace_path = USTD_DEFAULT_TRACE_PATH; @@ -374,7 +197,7 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) if(result == -1) { ERR("could not create directory %s", tmp); free(tmp); - return NULL; + return 1; } free(tmp); @@ -383,242 +206,67 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) if(result == -1) { PERROR("open"); ERR("failed opening trace file %s", tmp); - return NULL; + return 1; } - buf->file_fd = fd; + buf_local->file_fd = fd; free(tmp); - pthread_mutex_lock(&active_buffers_mutex); - active_buffers++; - pthread_mutex_unlock(&active_buffers_mutex); - - return buf; - -error: - free(buf); - return NULL; + return 0; } -static void destroy_buffer(struct buffer_info *buf) +int on_close_buffer(struct libustd_callbacks *data, struct buffer_info *buf) { - int result; - - result = ustcomm_close_app(&buf->conn); - if(result == -1) { - WARN("problem calling ustcomm_close_app"); - } - - result = shmdt(buf->mem); - if(result == -1) { - PERROR("shmdt"); - } - - result = shmdt(buf->bufstruct_mem); - if(result == -1) { - PERROR("shmdt"); - } - - result = close(buf->file_fd); + struct buffer_info_local *buf_local = buf->user_data; + int result = close(buf_local->file_fd); + free(buf_local); if(result == -1) { PERROR("close"); } - - free(buf); -} - -int unwrite_last_subbuffer(struct buffer_info *buf) -{ - int result; - - result = ftruncate(buf->file_fd, buf->previous_offset); - if(result == -1) { - PERROR("ftruncate"); - return -1; - } - - result = lseek(buf->file_fd, buf->previous_offset, SEEK_SET); - if(result == (int)(off_t)-1) { - PERROR("lseek"); - return -1; - } - return 0; } -int write_current_subbuffer(struct buffer_info *buf) +int on_put_error(struct libustd_callbacks *data, struct buffer_info *buf) { - int result; - - void *subbuf_mem = buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)); - - size_t cur_sb_size = subbuffer_data_size(subbuf_mem); - - off_t cur_offset = lseek(buf->file_fd, 0, SEEK_CUR); - if(cur_offset == (off_t)-1) { - PERROR("lseek"); - return -1; - } - - buf->previous_offset = cur_offset; - DBG("previous_offset: %ld", cur_offset); - - result = patient_write(buf->file_fd, subbuf_mem, cur_sb_size); - if(result == -1) { - PERROR("write"); - return -1; - } - - return 0; + unwrite_last_subbuffer(buf); } -int consumer_loop(struct buffer_info *buf) +struct libustd_callbacks *new_callbacks() { - int result; - - pthread_cleanup_push(decrement_active_buffers, NULL); - - for(;;) { - /* get the subbuffer */ - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - else if(result == GET_SUBBUF_DONE) { - /* this is done */ - break; - } - else if(result == GET_SUBBUF_DIED) { - finish_consuming_dead_subbuffer(buf); - break; - } + struct libustd_callbacks *callbacks = + malloc(sizeof(struct libustd_callbacks)); - /* write data to file */ - result = write_current_subbuffer(buf); - if(result == -1) { - ERR("Failed writing a subbuffer to file (channel=%s). Dropping this buffer.", buf->name); - } - - /* put the subbuffer */ - result = put_subbuffer(buf); - if(result == -1) { - ERR("unknown error putting subbuffer (channel=%s)", buf->name); - break; - } - else if(result == PUT_SUBBUF_PUSHED) { - ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name); - break; - } - else if(result == PUT_SUBBUF_DIED) { - DBG("application died while putting subbuffer"); - /* Skip the first subbuffer. We are not sure it is trustable - * because the put_subbuffer() did not complete. - */ - unwrite_last_subbuffer(buf); - finish_consuming_dead_subbuffer(buf); - break; - } - else if(result == PUT_SUBBUF_DONE) { - /* Done with this subbuffer */ - /* FIXME: add a case where this branch is used? Upon - * normal trace termination, at put_subbuf time, a - * special last-subbuffer code could be returned by - * the listener. - */ - break; - } - else if(result == PUT_SUBBUF_OK) { - } - } - - DBG("thread for buffer %s is stopping", buf->name); + if(!callbacks) + return NULL; - /* FIXME: destroy, unalloc... */ + callbacks->on_open_buffer = on_open_buffer; + callbacks->on_close_buffer = on_close_buffer; + callbacks->on_read_subbuffer = on_read_subbuffer; + callbacks->on_read_partial_subbuffer = on_read_partial_subbuffer; + callbacks->on_put_error = on_put_error; + callbacks->on_new_thread = NULL; + callbacks->on_close_thread = NULL; + callbacks->on_trace_end = NULL; - pthread_cleanup_pop(1); + return callbacks; - return 0; } -struct consumer_thread_args { - pid_t pid; - const char *bufname; -}; - -void *consumer_thread(void *arg) +int is_directory(const char *dir) { - struct buffer_info *buf = (struct buffer_info *) arg; - struct consumer_thread_args *args = (struct consumer_thread_args *) arg; int result; - sigset_t sigset; - - DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname); + struct stat st; - /* Block signals that should be handled by the main thread. */ - result = sigemptyset(&sigset); - if(result == -1) { - PERROR("sigemptyset"); - goto end; - } - result = sigaddset(&sigset, SIGTERM); - if(result == -1) { - PERROR("sigaddset"); - goto end; - } - result = sigaddset(&sigset, SIGINT); - if(result == -1) { - PERROR("sigaddset"); - goto end; - } - result = sigprocmask(SIG_BLOCK, &sigset, NULL); + result = stat(dir, &st); if(result == -1) { - PERROR("sigprocmask"); - goto end; - } - - buf = connect_buffer(args->pid, args->bufname); - if(buf == NULL) { - ERR("failed to connect to buffer"); - goto end; + PERROR("stat"); + return 0; } - consumer_loop(buf); - - destroy_buffer(buf); - - end: - free((void *)args->bufname); - free(args); - return NULL; -} - -int start_consuming_buffer(pid_t pid, const char *bufname) -{ - pthread_t thr; - struct consumer_thread_args *args; - int result; - - DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); - - args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args)); - - args->pid = pid; - args->bufname = strdup(bufname); - DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); - - result = pthread_create(&thr, NULL, consumer_thread, args); - if(result == -1) { - ERR("pthread_create failed"); - return -1; - } - result = pthread_detach(thr); - if(result == -1) { - ERR("pthread_detach failed"); - return -1; + if(!S_ISDIR(st.st_mode)) { + return 0; } - DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); - return 0; + return 1; } void usage(void) @@ -690,25 +338,7 @@ int parse_args(int argc, char **argv) void sigterm_handler(int sig) { - terminate_req = 1; -} - -static int write_pidfile(const char *file_name, pid_t pid) -{ - FILE *pidfp; - - pidfp = fopen(file_name, "w"); - if(!pidfp) { - PERROR("fopen (%s)", pidfile); - WARN("killing child process"); - return -1; - } - - fprintf(pidfp, "%d\n", pid); - - fclose(pidfp); - - return 0; + libustd_stop_instance(instance, 0); } int start_ustd(int fd) @@ -719,6 +349,12 @@ int start_ustd(int fd) struct sigaction sa; int timeout = -1; + struct libustd_callbacks *callbacks = new_callbacks(); + if(!callbacks) { + PERROR("new_callbacks"); + return 1; + } + result = sigemptyset(&sigset); if(result == -1) { PERROR("sigemptyset"); @@ -738,9 +374,15 @@ int start_ustd(int fd) return 1; } - result = ustcomm_init_ustd(&ustd, sock_path); - if(result == -1) { - ERR("failed to initialize socket"); + instance = libustd_new_instance(callbacks, sock_path); + if(!instance) { + ERR("failed to create libustd instance"); + return 1; + } + + result = libustd_init_instance(instance); + if(result) { + ERR("failed to initialize libustd instance"); return 1; } @@ -788,61 +430,9 @@ int start_ustd(int fd) } } - /* app loop */ - for(;;) { - char *recvbuf; - - /* check for requests on our public socket */ - result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, timeout); - if(result == -1 && errno == EINTR) { - /* Caught signal */ - } - else if(result == -1) { - ERR("error in ustcomm_ustd_recv_message"); - goto loop_end; - } - else if(result > 0) { - if(!strncmp(recvbuf, "collect", 7)) { - pid_t pid; - char *bufname; - int result; - - result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); - if(result != 2) { - ERR("parsing error: %s", recvbuf); - goto free_bufname; - } - - result = start_consuming_buffer(pid, bufname); - if(result < 0) { - ERR("error in add_buffer"); - goto free_bufname; - } - - free_bufname: - free(bufname); - } - else { - WARN("unknown command: %s", recvbuf); - } - - free(recvbuf); - } - - loop_end: - - if(terminate_req) { - pthread_mutex_lock(&active_buffers_mutex); - if(active_buffers == 0) { - pthread_mutex_unlock(&active_buffers_mutex); - break; - } - pthread_mutex_unlock(&active_buffers_mutex); - timeout = 100; - } - } + libustd_start_instance(instance); - ustcomm_fini_ustd(&ustd); + free(callbacks); return 0; } diff --git a/ustd/ustd.h b/ustd/ustd.h deleted file mode 100644 index d0e6fe5..0000000 --- a/ustd/ustd.h +++ /dev/null @@ -1,58 +0,0 @@ -/* Copyright (C) 2009 Pierre-Marc Fournier - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ - -#ifndef USTD_H -#define USTD_H - -#include "ustcomm.h" - -#define USTD_DEFAULT_TRACE_PATH "/tmp/usttrace" - -struct buffer_info { - const char *name; - pid_t pid; - struct ustcomm_connection conn; - - int shmid; - int bufstruct_shmid; - - /* the buffer memory */ - void *mem; - /* buffer size */ - int memlen; - /* number of subbuffers in buffer */ - int n_subbufs; - /* size of each subbuffer */ - int subbuf_size; - - /* the buffer information struct */ - void *bufstruct_mem; - - int file_fd; /* output file */ - - long consumed_old; - - s64 pidunique; - - /* the offset we must truncate to, to unput the last subbuffer */ - off_t previous_offset; -}; - -void finish_consuming_dead_subbuffer(struct buffer_info *buf); -size_t subbuffer_data_size(void *subbuf); - -#endif /* USTD_H */ -- 2.34.1