X-Git-Url: https://git.lttng.org/?p=ust.git;a=blobdiff_plain;f=libustconsumer%2Flibustconsumer.c;h=c6dd20c355050c69c04ec8a5e97562f9d49c7056;hp=ef54fe807352bf5be8242b8bf0e205fe1aaa26c4;hb=9edd34bd25f52dd39b354a84f02697254121aefd;hpb=cd6b724338f1156ce7281f00a06ea848bec7213c diff --git a/libustconsumer/libustconsumer.c b/libustconsumer/libustconsumer.c index ef54fe8..c6dd20c 100644 --- a/libustconsumer/libustconsumer.c +++ b/libustconsumer/libustconsumer.c @@ -34,7 +34,7 @@ #include #include "lowlevel.h" -#include "usterr.h" +#include "usterr_signal_safe.h" #include "ustcomm.h" #define GET_SUBBUF_OK 1 @@ -146,7 +146,7 @@ void decrement_active_buffers(void *arg) pthread_mutex_unlock(&instance->mutex); } -static int get_pidunique(int sock, s64 *pidunique) +static int get_pidunique(int sock, int64_t *pidunique) { struct ustcomm_header _send_hdr, *send_hdr; struct ustcomm_header _recv_hdr, *recv_hdr; @@ -426,6 +426,11 @@ static void destroy_buffer(struct ustconsumer_callbacks *callbacks, { int result; + result = close(buf->pipe_fd); + if(result == -1) { + WARN("problem closing the pipe fd"); + } + result = close(buf->app_sock); if(result == -1) { WARN("problem calling ustcomm_close_app"); @@ -472,6 +477,8 @@ int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf DBG("App died while being traced"); finish_consuming_dead_subbuffer(instance->callbacks, buf); break; + } else if (read_result == -1 && errno == EINTR) { + continue; } if(instance->callbacks->on_read_subbuffer) @@ -536,6 +543,10 @@ void *consumer_thread(void *arg) int result; sigset_t sigset; + pthread_mutex_lock(&args->instance->mutex); + args->instance->active_threads++; + pthread_mutex_unlock(&args->instance->mutex); + if(args->instance->callbacks->on_new_thread) args->instance->callbacks->on_new_thread(args->instance->callbacks); @@ -577,6 +588,10 @@ void *consumer_thread(void *arg) if(args->instance->callbacks->on_close_thread) args->instance->callbacks->on_close_thread(args->instance->callbacks); + pthread_mutex_lock(&args->instance->mutex); + args->instance->active_threads--; + pthread_mutex_unlock(&args->instance->mutex); + free((void *)args->channel); free(args); return NULL; @@ -728,7 +743,7 @@ int ustconsumer_start_instance(struct ustconsumer_instance *instance) if (instance->quit_program) { pthread_mutex_lock(&instance->mutex); - if(instance->active_buffers == 0) { + if (instance->active_buffers == 0 && instance->active_threads == 0) { pthread_mutex_unlock(&instance->mutex); break; } @@ -778,8 +793,11 @@ int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_ms struct sockaddr_un addr; +socket_again: result = fd = socket(PF_UNIX, SOCK_STREAM, 0); if(result == -1) { + if (errno == EINTR) + goto socket_again; PERROR("socket"); return 1; } @@ -789,13 +807,21 @@ int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_ms strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX); addr.sun_path[UNIX_PATH_MAX-1] = '\0'; +connect_again: result = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); if(result == -1) { + if (errno == EINTR) + goto connect_again; PERROR("connect"); } - while(bytes != sizeof(msg)) - bytes += send(fd, msg, sizeof(msg), 0); + while(bytes != sizeof(msg)) { + int inc = send(fd, msg, sizeof(msg), 0); + if (inc < 0 && errno != EINTR) + break; + else + bytes += inc; + } close(fd); @@ -841,7 +867,7 @@ static int init_ustconsumer_socket(struct ustconsumer_instance *instance) int result; /* Only check if socket dir exists if we are using the default directory */ - result = ensure_dir_exists(SOCK_DIR); + result = ensure_dir_exists(SOCK_DIR, S_IRWXU | S_IRWXG | S_IRWXO); if (result == -1) { ERR("Unable to create socket directory %s", SOCK_DIR); return -1;