X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-consumerd%2Flttng-consumerd.c;h=299776aa00400ea6ab806cb98a49f1a1b0e30f98;hp=3bc700dc948594a937c2ef543a8cbe4c3d4212f7;hb=6c1c0768320135c6936c371b09731851b508c023;hpb=ceed52b545103258e84f1c7040700be9dbbbaec6 diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 3bc700dc9..299776aa0 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -43,17 +44,21 @@ #include #include -#include -#include +#include +#include +#include #include -#include +#include #include "lttng-consumerd.h" +#include "health-consumerd.h" /* TODO : support UST (all direct kernel-ctl accesses). */ -/* the two threads (receive fd and poll) */ -static pthread_t threads[2]; +/* threads (channel handling, poll, metadata, sessiond) */ + +static pthread_t channel_thread, data_thread, metadata_thread, + sessiond_thread, metadata_timer_thread, health_thread; /* to count the number of times the user pressed ctrl+c */ static int sigintcount = 0; @@ -61,6 +66,8 @@ static int sigintcount = 0; /* Argument variables */ int lttng_opt_quiet; /* not static in error.h */ int lttng_opt_verbose; /* not static in error.h */ +int lttng_opt_mi; /* not static in error.h */ + static int opt_daemon; static const char *progname; static char command_sock_path[PATH_MAX]; /* Global command socket path */ @@ -70,6 +77,21 @@ static enum lttng_consumer_type opt_type = LTTNG_CONSUMER_KERNEL; /* the liblttngconsumerd context */ static struct lttng_consumer_local_data *ctx; +/* Consumerd health monitoring */ +struct health_app *health_consumerd; + +const char *tracing_group_name = DEFAULT_TRACING_GROUP; + +int lttng_consumer_ready = NR_LTTNG_CONSUMER_READY; + +enum lttng_consumer_type lttng_consumer_get_type(void) +{ + if (!ctx) { + return LTTNG_CONSUMER_UNKNOWN; + } + return ctx->type; +} + /* * Signal handler for the daemon */ @@ -80,6 +102,14 @@ static void sighandler(int sig) return; } + /* + * Ignore SIGPIPE because it should not stop the consumer whenever a + * SIGPIPE is catched through a FD operation. + */ + if (sig == SIGPIPE) { + return; + } + lttng_consumer_should_exit(ctx); } @@ -127,9 +157,9 @@ static void usage(FILE *fp) fprintf(fp, "Usage: %s OPTIONS\n\nOptions:\n", progname); fprintf(fp, " -h, --help " "Display this usage.\n"); - fprintf(fp, " -c, --consumerd-cmd-sock PATH " + fprintf(fp, " -c, --consumerd-cmd-sock PATH " "Specify path for the command socket\n"); - fprintf(fp, " -e, --consumerd-err-sock PATH " + fprintf(fp, " -e, --consumerd-err-sock PATH " "Specify path for the error socket\n"); fprintf(fp, " -d, --daemonize " "Start as a daemon.\n"); @@ -139,6 +169,8 @@ static void usage(FILE *fp) "Verbose mode. Activate DBG() macro.\n"); fprintf(fp, " -V, --version " "Show version number.\n"); + fprintf(fp, " -g, --group NAME " + "Specify the tracing group name. (default: tracing)\n"); fprintf(fp, " -k, --kernel " "Consumer kernel buffers (default).\n"); fprintf(fp, " -u, --ust " @@ -162,6 +194,7 @@ static void parse_args(int argc, char **argv) { "consumerd-cmd-sock", 1, 0, 'c' }, { "consumerd-err-sock", 1, 0, 'e' }, { "daemonize", 0, 0, 'd' }, + { "group", 1, 0, 'g' }, { "help", 0, 0, 'h' }, { "quiet", 0, 0, 'q' }, { "verbose", 0, 0, 'v' }, @@ -175,7 +208,7 @@ static void parse_args(int argc, char **argv) while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhqvVku" "c:e:", long_options, &option_index); + c = getopt_long(argc, argv, "dhqvVku" "c:e:g:", long_options, &option_index); if (c == -1) { break; } @@ -196,6 +229,9 @@ static void parse_args(int argc, char **argv) case 'd': opt_daemon = 1; break; + case 'g': + tracing_group_name = optarg; + break; case 'h': usage(stdout); exit(EXIT_SUCCESS); @@ -253,7 +289,6 @@ static void set_ulimit(void) */ int main(int argc, char **argv) { - int i; int ret = 0; void *status; @@ -285,7 +320,10 @@ int main(int argc, char **argv) } } - if (strlen(command_sock_path) == 0) { + /* Set up max poll set size */ + lttng_poll_set_max_size(); + + if (*command_sock_path == '\0') { switch (opt_type) { case LTTNG_CONSUMER_KERNEL: snprintf(command_sock_path, PATH_MAX, DEFAULT_KCONSUMERD_CMD_SOCK_PATH, @@ -306,13 +344,24 @@ int main(int argc, char **argv) } /* Init */ - lttng_consumer_init(); + if (lttng_consumer_init() < 0) { + goto error; + } + + /* Init socket timeouts */ + lttcomm_init(); + lttcomm_inet_init(); if (!getuid()) { /* Set limit for open files */ set_ulimit(); } + health_consumerd = health_app_create(NR_HEALTH_CONSUMERD_TYPES); + if (!health_consumerd) { + goto error; + } + /* create the consumer instance with and assign the callbacks */ ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer, NULL, lttng_consumer_on_recv_stream, NULL); @@ -321,7 +370,7 @@ int main(int argc, char **argv) } lttng_consumer_set_command_sock_path(ctx, command_sock_path); - if (strlen(error_sock_path) == 0) { + if (*error_sock_path == '\0') { switch (opt_type) { case LTTNG_CONSUMER_KERNEL: snprintf(error_sock_path, PATH_MAX, DEFAULT_KCONSUMERD_ERR_SOCK_PATH, @@ -354,40 +403,138 @@ int main(int argc, char **argv) } lttng_consumer_set_error_sock(ctx, ret); - /* Create the thread to manage the receive of fd */ - ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds, + /* + * Block RT signals used for UST periodical metadata flush and the live + * timer in main, and create a dedicated thread to handle these signals. + */ + consumer_signal_init(); + + ctx->type = opt_type; + + ret = utils_create_pipe(health_quit_pipe); + if (ret < 0) { + goto error_health_pipe; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto health_error; + } + + /* + * Wait for health thread to be initialized before letting the + * sessiond thread reply to the sessiond that we are ready. + */ + while (uatomic_read(<tng_consumer_ready)) { + usleep(100000); + } + cmm_smp_mb(); /* Read ready before following operations */ + + /* Create thread to manage channels */ + ret = pthread_create(&channel_thread, NULL, consumer_thread_channel_poll, (void *) ctx); if (ret != 0) { perror("pthread_create"); - goto error; + goto channel_error; } - /* Create thread to manage the polling/writing of traces */ - ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds, + /* Create thread to manage the polling/writing of trace metadata */ + ret = pthread_create(&metadata_thread, NULL, consumer_thread_metadata_poll, (void *) ctx); if (ret != 0) { perror("pthread_create"); + goto metadata_error; + } + + /* Create thread to manage the polling/writing of trace data */ + ret = pthread_create(&data_thread, NULL, consumer_thread_data_poll, + (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto data_error; + } + + /* Create the thread to manage the receive of fd */ + ret = pthread_create(&sessiond_thread, NULL, consumer_thread_sessiond_poll, + (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto sessiond_error; + } + + /* + * Create the thread to manage the UST metadata periodic timer and + * live timer. + */ + ret = pthread_create(&metadata_timer_thread, NULL, + consumer_timer_thread, (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto metadata_timer_error; + } + + ret = pthread_detach(metadata_timer_thread); + if (ret) { + errno = ret; + perror("pthread_detach"); + } + +metadata_timer_error: + ret = pthread_join(sessiond_thread, &status); + if (ret != 0) { + perror("pthread_join"); goto error; } - for (i = 0; i < 2; i++) { - ret = pthread_join(threads[i], &status); - if (ret != 0) { - perror("pthread_join"); - goto error; - } +sessiond_error: + ret = pthread_join(data_thread, &status); + if (ret != 0) { + perror("pthread_join"); + goto error; + } + +data_error: + ret = pthread_join(metadata_thread, &status); + if (ret != 0) { + perror("pthread_join"); + goto error; + } + +metadata_error: + ret = pthread_join(channel_thread, &status); + if (ret != 0) { + perror("pthread_join"); + goto error; + } + +channel_error: + ret = pthread_join(health_thread, &status); + if (ret != 0) { + PERROR("pthread_join health thread"); + goto error; /* join error, exit without cleanup */ + } + +health_error: + utils_close_pipe(health_quit_pipe); + +error_health_pipe: + if (!ret) { + ret = EXIT_SUCCESS; + goto end; } - ret = EXIT_SUCCESS; - lttng_consumer_send_error(ctx, CONSUMERD_EXIT_SUCCESS); - goto end; error: ret = EXIT_FAILURE; - lttng_consumer_send_error(ctx, CONSUMERD_EXIT_FAILURE); end: lttng_consumer_destroy(ctx); lttng_consumer_cleanup(); + if (health_consumerd) { + health_app_destroy(health_consumerd); + } return ret; }