X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=857201def7d9d35d1228e23e9ed4d2472b4f2088;hp=0035aae82d51e1afaa538fddfeb6e7463bfa53db;hb=7567352fb68f5c3f49f549c579f5bd27c883bed2;hpb=fd20dac985126e84929d657f5a1042222c7d5017 diff --git a/src/common/consumer.c b/src/common/consumer.c index 0035aae82..857201def 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -48,6 +48,7 @@ #include "consumer.h" #include "consumer-stream.h" #include "consumer-testpoint.h" +#include "align.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -285,6 +286,17 @@ static void free_channel_rcu(struct rcu_head *head) struct lttng_consumer_channel *channel = caa_container_of(node, struct lttng_consumer_channel, node); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_free_channel(channel); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } free(channel); } @@ -562,6 +574,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->metadata_timer_lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ if (type == CONSUMER_CHANNEL_TYPE_METADATA) { @@ -700,7 +713,7 @@ end: /* * Allocate and return a consumer relayd socket. */ -struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( +static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( uint64_t net_seq_idx) { struct consumer_relayd_sock_pair *obj = NULL; @@ -935,7 +948,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + const char *root_shm_path, + const char *shm_path) { struct lttng_consumer_channel *channel; @@ -992,6 +1007,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, strncpy(channel->name, name, sizeof(channel->name)); channel->name[sizeof(channel->name) - 1] = '\0'; + if (root_shm_path) { + strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path)); + channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0'; + } + if (shm_path) { + strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path)); + channel->shm_path[sizeof(channel->shm_path) - 1] = '\0'; + } + lttng_ht_node_init_u64(&channel->node, channel->key); channel->wait_fd = -1; @@ -1188,6 +1212,8 @@ void lttng_consumer_cleanup(void) * it. */ lttng_ht_destroy(consumer_data.stream_list_ht); + + run_as_destroy_worker(); } /* @@ -2181,18 +2207,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - health_code_update(); - - /* Only the metadata pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); health_poll_entry(); + DBG("Metadata poll wait"); ret = lttng_poll_wait(&events, -1); + DBG("Metadata poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2200,7 +2221,10 @@ restart: ERR("Poll EINTR catched"); goto restart; } - goto error; + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } + goto end; } nb_fd = ret; @@ -2218,26 +2242,22 @@ restart: } if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { - if (revents & (LPOLLERR | LPOLLHUP )) { - DBG("Metadata thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, - lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); - lttng_pipe_read_close(ctx->consumer_metadata_pipe); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ssize_t pipe_len; pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < sizeof(stream)) { - PERROR("read metadata stream"); + if (pipe_len < 0) { + PERROR("read metadata stream"); + } /* - * Continue here to handle the rest of the streams. + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); continue; } @@ -2254,6 +2274,19 @@ restart: /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI | LPOLLHUP); + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Metadata thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2272,8 +2305,30 @@ restart: stream = caa_container_of(node, struct lttng_consumer_stream, node); - /* Check for error event */ - if (revents & (LPOLLERR | LPOLLHUP)) { + if (revents & (LPOLLIN | LPOLLPRI)) { + /* Get the data out of the metadata file descriptor */ + DBG("Metadata available on fd %d", pollfd); + assert(stream->wait_fd == pollfd); + + do { + health_code_update(); + + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); + + /* It's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN && len != -ENODATA) { + /* Clean up stream from consumer and free it. */ + lttng_poll_del(&events, stream->wait_fd); + consumer_del_metadata_stream(stream, metadata_ht); + } + } else if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata fd %d is hup|err.", pollfd); if (!stream->hangup_flush_done && (consumer_data.type == LTTNG_CONSUMER32_UST @@ -2301,31 +2356,11 @@ restart: * and securely free the stream. */ consumer_del_metadata_stream(stream, metadata_ht); - } else if (revents & (LPOLLIN | LPOLLPRI)) { - /* Get the data out of the metadata file descriptor */ - DBG("Metadata available on fd %d", pollfd); - assert(stream->wait_fd == pollfd); - - do { - health_code_update(); - - len = ctx->on_buffer_ready(stream, ctx); - /* - * We don't check the return value here since if we get - * a negative len, it means an error occured thus we - * simply remove it from the poll set and free the - * stream. - */ - } while (len > 0); - - /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN && len != -ENODATA) { - /* Clean up stream from consumer and free it. */ - lttng_poll_del(&events, stream->wait_fd); - consumer_del_metadata_stream(stream, metadata_ht); - } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock(); + goto end; } - /* Release RCU lock for the stream looked up */ rcu_read_unlock(); } @@ -2333,7 +2368,6 @@ restart: /* All is OK */ err = 0; -error: end: DBG("Metadata poll thread exiting"); @@ -2756,18 +2790,13 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { - health_code_update(); - - /* Only the channel pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); + DBG("Channel poll wait"); health_poll_entry(); ret = lttng_poll_wait(&events, -1); + DBG("Channel poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { @@ -2775,6 +2804,9 @@ restart: ERR("Poll EINTR catched"); goto restart; } + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } goto end; } @@ -2793,21 +2825,16 @@ restart: } if (pollfd == ctx->consumer_channel_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP)) { - DBG("Channel thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { enum consumer_channel_action action; uint64_t key; ret = read_channel_pipe(ctx, &chan, &key, &action); if (ret <= 0) { - ERR("Error reading channel pipe"); + if (ret < 0) { + ERR("Error reading channel pipe"); + } + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); continue; } @@ -2824,7 +2851,7 @@ restart: rcu_read_unlock(); /* Add channel to the global poll events list */ lttng_poll_add(&events, chan->wait_fd, - LPOLLIN | LPOLLPRI); + LPOLLERR | LPOLLHUP); break; case CONSUMER_CHANNEL_DEL: { @@ -2884,6 +2911,17 @@ restart: ERR("Unknown action"); break; } + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Channel thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2922,6 +2960,10 @@ restart: && !uatomic_read(&chan->nb_init_stream_left)) { consumer_del_channel(chan); } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock(); + goto end; } /* Release RCU lock for the channel looked up */ @@ -3656,22 +3698,19 @@ int consumer_send_status_channel(int sock, return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); } -/* - * Using a maximum stream size with the produced and consumed position of a - * stream, computes the new consumed position to be as close as possible to the - * maximum possible stream size. - * - * If maximum stream size is lower than the possible buffer size (produced - - * consumed), the consumed_pos given is returned untouched else the new value - * is returned. - */ -unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t max_stream_size) +unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t nb_packets_per_stream, + uint64_t max_sb_size) { - if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) { - /* Offset from the produced position to get the latest buffers. */ - return produced_pos - max_stream_size; - } + unsigned long start_pos; - return consumed_pos; + if (!nb_packets_per_stream) { + return consumed_pos; /* Grab everything */ + } + start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size); + start_pos -= max_sb_size * nb_packets_per_stream; + if ((long) (start_pos - consumed_pos) < 0) { + return consumed_pos; /* Grab everything */ + } + return start_pos; }