X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=kconsumerd%2Fkconsumerd.c;h=315095ee3c102794817176262d096e1b21089b96;hp=7e166b8f144886dc254cf997c4753721e3350efb;hb=bcd8d9db8fccd691c1e919bfa7c0d49a2fda35f9;hpb=d4a1283e471c152cedf6cb5ebcc74d509009145d diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index 7e166b8f1..315095ee3 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -72,6 +72,28 @@ static const char *progname; static char command_sock_path[PATH_MAX]; /* Global command socket path */ static char error_sock_path[PATH_MAX]; /* Global error path */ +/* + * del_fd + * + * Remove a fd from the global list protected by a mutex + */ +static void del_fd(struct ltt_kconsumerd_fd *lcf) +{ + pthread_mutex_lock(&kconsumerd_lock_fds); + cds_list_del(&lcf->list); + if (fds_count > 0) { + fds_count--; + DBG("Removed ltt_kconsumerd_fd"); + if (lcf != NULL) { + close(lcf->out_fd); + close(lcf->consumerd_fd); + free(lcf); + lcf = NULL; + } + } + pthread_mutex_unlock(&kconsumerd_lock_fds); +} + /* * cleanup * @@ -79,7 +101,21 @@ static char error_sock_path[PATH_MAX]; /* Global error path */ */ static void cleanup() { + struct ltt_kconsumerd_fd *iter; + + + /* remove the socket file */ unlink(command_sock_path); + + /* unblock the threads */ + WARN("Terminating the threads before exiting"); + pthread_cancel(threads[0]); + pthread_cancel(threads[1]); + + /* close all outfd */ + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + del_fd(iter); + } } /* send_error @@ -96,21 +132,6 @@ static int send_error(enum lttcomm_return_code cmd) } } -/* - * cleanup_kconsumerd_fd - * - * Close the FDs and frees a ltt_kconsumerd_fd struct - */ -static void cleanup_kconsumerd_fd(struct ltt_kconsumerd_fd *lcf) -{ - if (lcf != NULL) { - close(lcf->out_fd); - close(lcf->consumerd_fd); - free(lcf); - lcf = NULL; - } -} - /* * add_fd * @@ -152,36 +173,6 @@ end: return ret; } -/* - * del_fd - * - * Remove a fd from the global list protected by a mutex - */ -static void del_fd(struct ltt_kconsumerd_fd *lcf) -{ - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_del(&lcf->list); - if (fds_count > 0) { - fds_count--; - DBG("Removed ltt_kconsumerd_fd"); - cleanup_kconsumerd_fd(lcf); - } - pthread_mutex_unlock(&kconsumerd_lock_fds); -} - -/* - * close_outfds - * - * Close all fds in the previous fd_list - * Must be used with kconsumerd_lock_fds lock held - */ -static void close_outfds() -{ - struct ltt_kconsumerd_fd *iter; - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { - del_fd(iter); - } -} /* * sighandler @@ -190,11 +181,6 @@ static void close_outfds() */ static void sighandler(int sig) { - /* unblock the threads */ - pthread_cancel(threads[0]); - pthread_cancel(threads[1]); - - close_outfds(); cleanup(); return; @@ -260,16 +246,18 @@ static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice chan to pipe ret %ld", ret); if (ret < 0) { + ret = errno; perror("Error in relay splice"); - goto write_end; + goto splice_error; } ret = splice(thread_pipe[0], NULL, outfd, NULL, ret, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice pipe to file %ld", ret); if (ret < 0) { + ret = errno; perror("Error in file splice"); - goto write_end; + goto splice_error; } if (ret >= len) { len = 0; @@ -279,7 +267,7 @@ static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd, SYNC_FILE_RANGE_WRITE); kconsumerd_fd->out_fd_offset += ret; } -write_end: + /* * This does a blocking write-and-wait on any page that belongs to the * subbuffer prior to the one we just wrote. @@ -307,6 +295,26 @@ write_end: posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); } + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch(ret) { + case EBADF: + send_error(KCONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + send_error(KCONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + send_error(KCONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + send_error(KCONSUMERD_SPLICE_ESPIPE); + break; + } + +end: return ret; } @@ -507,6 +515,7 @@ static void *thread_receive_fds(void *data) /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); if (sock <= 0) { + WARN("On accept, retrying"); continue; } @@ -514,16 +523,18 @@ static void *thread_receive_fds(void *data) ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); if (ret < 0) { - ERR("Receiving the lttcomm_kconsumerd_header"); - continue; + ERR("Receiving the lttcomm_kconsumerd_header, exiting"); + goto error; } ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); if (ret < 0) { - continue; + ERR("Receiving the FD, exiting"); + goto error; } } error: + cleanup(); return NULL; } @@ -696,6 +707,7 @@ end: free(local_kconsumerd_fd); local_kconsumerd_fd = NULL; } + cleanup(); return NULL; }