X-Git-Url: https://git.lttng.org/?p=ust.git;a=blobdiff_plain;f=libustconsumer%2Flibustconsumer.c;h=c6dd20c355050c69c04ec8a5e97562f9d49c7056;hp=6cb3dbf3b472327c65b1a8363d552a23c6b042ec;hb=9edd34bd25f52dd39b354a84f02697254121aefd;hpb=9dc7b7ff797a5cbb1e9ffd59e053a04562f306c4 diff --git a/libustconsumer/libustconsumer.c b/libustconsumer/libustconsumer.c index 6cb3dbf..c6dd20c 100644 --- a/libustconsumer/libustconsumer.c +++ b/libustconsumer/libustconsumer.c @@ -34,7 +34,7 @@ #include #include "lowlevel.h" -#include "usterr.h" +#include "usterr_signal_safe.h" #include "ustcomm.h" #define GET_SUBBUF_OK 1 @@ -146,7 +146,7 @@ void decrement_active_buffers(void *arg) pthread_mutex_unlock(&instance->mutex); } -static int get_pidunique(int sock, s64 *pidunique) +static int get_pidunique(int sock, int64_t *pidunique) { struct ustcomm_header _send_hdr, *send_hdr; struct ustcomm_header _recv_hdr, *recv_hdr; @@ -353,6 +353,10 @@ struct buffer_info *connect_buffer(struct ustconsumer_instance *instance, pid_t goto close_fifo; } + /* Set subbuffer's information */ + buf->subbuf_size_order = get_count_order(buf->subbuf_size); + buf->alloc_size = buf->subbuf_size * buf->n_subbufs; + /* attach memory */ buf->mem = shmat(buf->shmid, NULL, 0); if(buf->mem == (void *) 0) { @@ -422,6 +426,11 @@ static void destroy_buffer(struct ustconsumer_callbacks *callbacks, { int result; + result = close(buf->pipe_fd); + if(result == -1) { + WARN("problem closing the pipe fd"); + } + result = close(buf->app_sock); if(result == -1) { WARN("problem calling ustcomm_close_app"); @@ -445,7 +454,8 @@ static void destroy_buffer(struct ustconsumer_callbacks *callbacks, int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf) { - int result, read_result; + int result = 0; + int read_result; char read_buf; pthread_cleanup_push(decrement_active_buffers, instance); @@ -467,6 +477,8 @@ int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf DBG("App died while being traced"); finish_consuming_dead_subbuffer(instance->callbacks, buf); break; + } else if (read_result == -1 && errno == EINTR) { + continue; } if(instance->callbacks->on_read_subbuffer) @@ -487,6 +499,7 @@ int consumer_loop(struct ustconsumer_instance *instance, struct buffer_info *buf /* Skip the first subbuffer. We are not sure it is trustable * because the put_subbuffer() did not complete. */ + /* TODO: check on_put_error return value */ if(instance->callbacks->on_put_error) instance->callbacks->on_put_error(instance->callbacks, buf); @@ -530,6 +543,10 @@ void *consumer_thread(void *arg) int result; sigset_t sigset; + pthread_mutex_lock(&args->instance->mutex); + args->instance->active_threads++; + pthread_mutex_unlock(&args->instance->mutex); + if(args->instance->callbacks->on_new_thread) args->instance->callbacks->on_new_thread(args->instance->callbacks); @@ -571,6 +588,10 @@ void *consumer_thread(void *arg) if(args->instance->callbacks->on_close_thread) args->instance->callbacks->on_close_thread(args->instance->callbacks); + pthread_mutex_lock(&args->instance->mutex); + args->instance->active_threads--; + pthread_mutex_unlock(&args->instance->mutex); + free((void *)args->channel); free(args); return NULL; @@ -598,7 +619,7 @@ int start_consuming_buffer(struct ustconsumer_instance *instance, pid_t pid, args->channel_cpu = channel_cpu; args->instance = instance; DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s" - " bufname %s_%d", args->pid, args->channel, args->channel_cpu); + " bufname %s_%d", args->pid, args->trace, args->channel, args->channel_cpu); result = pthread_create(&thr, NULL, consumer_thread, args); if(result == -1) { @@ -611,7 +632,7 @@ int start_consuming_buffer(struct ustconsumer_instance *instance, pid_t pid, return -1; } DBG("end of start_consuming_buffer: args: pid %d trace %s " - "bufname %s_%d", args->pid, args->channel, args->channel_cpu); + "bufname %s_%d", args->pid, args->channel, args->trace, args->channel_cpu); return 0; } @@ -619,7 +640,7 @@ static void process_client_cmd(int sock, struct ustcomm_header *req_header, char *recvbuf, struct ustconsumer_instance *instance) { int result; - struct ustcomm_header _res_header; + struct ustcomm_header _res_header = {0}; struct ustcomm_header *res_header = &_res_header; struct ustcomm_buffer_info *buf_inf; @@ -722,7 +743,7 @@ int ustconsumer_start_instance(struct ustconsumer_instance *instance) if (instance->quit_program) { pthread_mutex_lock(&instance->mutex); - if(instance->active_buffers == 0) { + if (instance->active_buffers == 0 && instance->active_threads == 0) { pthread_mutex_unlock(&instance->mutex); break; } @@ -772,8 +793,11 @@ int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_ms struct sockaddr_un addr; +socket_again: result = fd = socket(PF_UNIX, SOCK_STREAM, 0); if(result == -1) { + if (errno == EINTR) + goto socket_again; PERROR("socket"); return 1; } @@ -783,13 +807,21 @@ int ustconsumer_stop_instance(struct ustconsumer_instance *instance, int send_ms strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX); addr.sun_path[UNIX_PATH_MAX-1] = '\0'; +connect_again: result = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); if(result == -1) { + if (errno == EINTR) + goto connect_again; PERROR("connect"); } - while(bytes != sizeof(msg)) - bytes += send(fd, msg, sizeof(msg), 0); + while(bytes != sizeof(msg)) { + int inc = send(fd, msg, sizeof(msg), 0); + if (inc < 0 && errno != EINTR) + break; + else + bytes += inc; + } close(fd); @@ -835,7 +867,7 @@ static int init_ustconsumer_socket(struct ustconsumer_instance *instance) int result; /* Only check if socket dir exists if we are using the default directory */ - result = ensure_dir_exists(SOCK_DIR); + result = ensure_dir_exists(SOCK_DIR, S_IRWXU | S_IRWXG | S_IRWXO); if (result == -1) { ERR("Unable to create socket directory %s", SOCK_DIR); return -1;