X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=187c0c30cdf003741f053417e1c9209c64b80713;hb=0475c50c4d3d2cea973fe4d1f17875d231dea96c;hp=892c841ba0f8958ae46cff1f4d577f0426df1415;hpb=41ba603553a90bde7c47eb10098224bde504d571;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 892c841ba..187c0c30c 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -93,6 +94,18 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write consumer health quit"); + } +} + static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *chan, uint64_t key, @@ -2182,7 +2195,7 @@ static void validate_endpoint_status_metadata_stream( */ void *consumer_thread_metadata_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; @@ -2193,6 +2206,10 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + + health_code_update(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2218,14 +2235,19 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { + health_code_update(); + /* Only the metadata pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } restart: DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2239,6 +2261,8 @@ restart: /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2308,6 +2332,8 @@ restart: /* We just flushed the stream now read it. */ do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2330,6 +2356,8 @@ restart: assert(stream->wait_fd == pollfd); do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2352,6 +2380,8 @@ restart: } } + /* All is OK */ + err = 0; error: end: DBG("Metadata poll thread exiting"); @@ -2360,6 +2390,11 @@ end: end_poll: destroy_stream_ht(metadata_ht); end_ht: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2370,7 +2405,7 @@ end_ht: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i; + int num_rdy, num_hup, high_prio, ret, i, err = -1; struct pollfd *pollfd = NULL; /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; @@ -2381,6 +2416,10 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + + health_code_update(); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ @@ -2394,6 +2433,8 @@ void *consumer_thread_data_poll(void *data) } while (1) { + health_code_update(); + high_prio = 0; num_hup = 0; @@ -2440,12 +2481,15 @@ void *consumer_thread_data_poll(void *data) /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } /* poll on the array of fds */ restart: DBG("polling on %d fd", nb_fd + 1); + health_poll_entry(); num_rdy = poll(pollfd, nb_fd + 1, -1); + health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { /* @@ -2495,6 +2539,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2523,6 +2569,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of low priority channels. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2543,6 +2591,8 @@ void *consumer_thread_data_poll(void *data) /* Handle hangup and errors */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2588,6 +2638,8 @@ void *consumer_thread_data_poll(void *data) } } } + /* All is OK */ + err = 0; end: DBG("polling thread exiting"); free(pollfd); @@ -2605,6 +2657,12 @@ end: destroy_data_stream_ht(data_ht); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; } @@ -2686,7 +2744,7 @@ static void destroy_channel_ht(struct lttng_ht *ht) */ void *consumer_thread_channel_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_channel *chan = NULL; struct lttng_ht_iter iter; @@ -2697,6 +2755,10 @@ void *consumer_thread_channel_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + + health_code_update(); + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!channel_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2721,14 +2783,19 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { + health_code_update(); + /* Only the channel pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } restart: DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2742,6 +2809,8 @@ restart: /* From here, the event is a channel wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2808,6 +2877,8 @@ restart: /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, send_node) { + health_code_update(); + cds_list_del(&stream->send_node); lttng_ustconsumer_del_stream(stream); uatomic_sub(&stream->chan->refcount, 1); @@ -2880,12 +2951,19 @@ restart: } } + /* All is OK */ + err = 0; end: lttng_poll_clean(&events); end_poll: destroy_channel_ht(channel_ht); end_ht: DBG("Channel poll thread exiting"); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2923,7 +3001,7 @@ error: */ void *consumer_thread_sessiond_poll(void *data) { - int sock = -1, client_socket, ret; + int sock = -1, client_socket, ret, err = -1; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2933,6 +3011,10 @@ void *consumer_thread_sessiond_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + + health_code_update(); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -2993,7 +3075,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].events = POLLIN | POLLPRI; while (1) { - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + health_code_update(); + + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret < 0) { goto end; } DBG("Incoming command on sock"); @@ -3013,10 +3100,14 @@ void *consumer_thread_sessiond_poll(void *data) } if (consumer_quit) { DBG("consumer_thread_receive_fds received quit from signal"); + err = 0; /* All is OK */ goto end; } DBG("received command on sock"); } + /* All is OK */ + err = 0; + end: DBG("Consumer thread sessiond poll exiting"); @@ -3042,6 +3133,8 @@ end: notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT); + notify_health_quit_pipe(health_quit_pipe); + /* Cleaning up possibly open sockets. */ if (sock >= 0) { ret = close(sock); @@ -3056,6 +3149,12 @@ end: } } + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; }